From a17683fe4bd8826f8ff5145343e55896b407400d Mon Sep 17 00:00:00 2001 From: Ivan Rakov Date: Thu, 10 Aug 2017 13:31:23 +0300 Subject: [PATCH 1/8] IGNITE-5961 - Align pages in LFS partition files to pageSize (cherry picked from commit 3919d80) --- .../internal/pagemem/store/PageStore.java | 5 + .../cache/persistence/file/FilePageStore.java | 54 +++++--- .../file/FilePageStoreFactory.java | 35 ++++++ .../file/FilePageStoreManager.java | 17 +-- .../persistence/file/FilePageStoreV2.java | 53 ++++++++ .../file/FileVersionCheckingFactory.java | 116 ++++++++++++++++++ ...itePdsRecoveryAfterFileCorruptionTest.java | 2 +- 7 files changed, 251 insertions(+), 31 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreFactory.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java index 4698a6b116fb0..f6e577ce3916e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java @@ -95,4 +95,9 @@ public interface PageStore { * @throws IgniteCheckedException If sync failed (IO error occurred). */ public void ensure() throws IgniteCheckedException; + + /** + * @return Page store version. + */ + public int version(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java index a7ca13c2d1e62..7fe1ffe9ad104 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java @@ -45,10 +45,10 @@ public class FilePageStore implements PageStore { private static final long SIGNATURE = 0xF19AC4FE60C530B8L; /** File version. */ - private static final int VERSION = 1; + public static final int VERSION = 1; /** Allocated field offset. */ - public static final int HEADER_SIZE = 8/*SIGNATURE*/ + 4/*VERSION*/ + 1/*type*/ + 4/*page size*/; + static final int HEADER_SIZE = 8/*SIGNATURE*/ + 4/*VERSION*/ + 1/*type*/ + 4/*page size*/; /** */ private final File cfgFile; @@ -57,7 +57,7 @@ public class FilePageStore implements PageStore { private final byte type; /** Database configuration. */ - private final MemoryConfiguration dbCfg; + protected final MemoryConfiguration dbCfg; /** Factory to provide I/O interfaces for read/write operations with files */ private final FileIOFactory ioFactory; @@ -103,20 +103,36 @@ public FilePageStore(byte type, File file, FileIOFactory factory, MemoryConfigur /** {@inheritDoc} */ @Override public boolean exists() { - return cfgFile.exists() && cfgFile.length() > HEADER_SIZE; + return cfgFile.exists() && cfgFile.length() > headerSize(); } /** + * Size of page store header. + */ + public int headerSize() { + return HEADER_SIZE; + } + + /** + * Page store version. + */ + public int version() { + return VERSION; + } + + /** + * Creates header for current version file store. Doesn't init the store. + * * @param type Type. * @param pageSize Page size. * @return Byte buffer instance. */ - public static ByteBuffer header(byte type, int pageSize) { - ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN); + public ByteBuffer header(byte type, int pageSize) { + ByteBuffer hdr = ByteBuffer.allocate(headerSize()).order(ByteOrder.LITTLE_ENDIAN); hdr.putLong(SIGNATURE); - hdr.putInt(VERSION); + hdr.putInt(version()); hdr.put(type); @@ -142,7 +158,7 @@ private long initFile() { } //there is 'super' page in every file - return HEADER_SIZE + dbCfg.getPageSize(); + return headerSize() + dbCfg.getPageSize(); } /** @@ -150,7 +166,7 @@ private long initFile() { */ private long checkFile() throws IgniteCheckedException { try { - ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN); + ByteBuffer hdr = ByteBuffer.allocate(headerSize()).order(ByteOrder.LITTLE_ENDIAN); while (hdr.remaining() > 0) fileIO.read(hdr); @@ -166,9 +182,9 @@ private long checkFile() throws IgniteCheckedException { int ver = hdr.getInt(); - if (VERSION != ver) + if (version() != ver) throw new IgniteCheckedException("Failed to verify store file (invalid file version)" + - " [expectedVersion=" + VERSION + + " [expectedVersion=" + version() + ", fileVersion=" + ver + "]"); byte type = hdr.get(); @@ -187,10 +203,10 @@ private long checkFile() throws IgniteCheckedException { long fileSize = cfgFile.length(); - if (fileSize == HEADER_SIZE) // Every file has a special meta page. - fileSize = pageSize + HEADER_SIZE; + if (fileSize == headerSize()) // Every file has a special meta page. + fileSize = pageSize + headerSize(); - if ((fileSize - HEADER_SIZE) % pageSize != 0) + if ((fileSize - headerSize()) % pageSize != 0) throw new IgniteCheckedException("Failed to verify store file (invalid file size)" + " [fileSize=" + U.hexLong(fileSize) + ", pageSize=" + U.hexLong(pageSize) + ']'); @@ -346,9 +362,9 @@ public void finishRecover() { init(); try { - assert buf.remaining() == HEADER_SIZE; + assert buf.remaining() == headerSize(); - int len = HEADER_SIZE; + int len = headerSize(); long off = 0; @@ -425,7 +441,7 @@ private void init() throws IgniteCheckedException { long off = pageOffset(pageId); - assert (off >= 0 && off + pageSize <= allocated.get() + HEADER_SIZE) || recover : + assert (off >= 0 && off + pageSize <= allocated.get() + headerSize()) || recover : "off=" + U.hexLong(off) + ", allocated=" + U.hexLong(allocated.get()) + ", pageId=" + U.hexLong(pageId); assert pageBuf.capacity() == pageSize; @@ -463,7 +479,7 @@ private void init() throws IgniteCheckedException { /** {@inheritDoc} */ @Override public long pageOffset(long pageId) { - return (long) PageIdUtils.pageIndex(pageId) * pageSize + HEADER_SIZE; + return (long) PageIdUtils.pageIndex(pageId) * pageSize + headerSize(); } /** {@inheritDoc} */ @@ -494,7 +510,7 @@ private void init() throws IgniteCheckedException { long off = allocPage(); - return off / pageSize; + return (off - headerSize()) / pageSize; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreFactory.java new file mode 100644 index 0000000000000..d97ab26397a03 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreFactory.java @@ -0,0 +1,35 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.ignite.internal.processors.cache.persistence.file; + +import java.io.File; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.PageIdAllocator; + +/** + * + */ +public interface FilePageStoreFactory { + /** + * Creates instance of FilePageStore based on given file. + * + * @param type Data type, can be {@link PageIdAllocator#FLAG_IDX} or {@link PageIdAllocator#FLAG_DATA}. + * @param file File Page store file. + */ + public FilePageStore createPageStore(byte type, File file) throws IgniteCheckedException; +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java index e2ad070741e6f..0041ea61d4991 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java @@ -365,21 +365,16 @@ private CacheStoreHolder initForCache(CacheGroupDescriptor grpDesc, CacheConfigu if (dirExisted && !idxFile.exists()) grpsWithoutIdx.add(grpDesc.groupId()); - FilePageStore idxStore = new FilePageStore( - PageMemory.FLAG_IDX, - idxFile, - pstCfg.getFileIOFactory(), - cctx.kernalContext().config().getMemoryConfiguration()); + FileVersionCheckingFactory pageStoreFactory = new FileVersionCheckingFactory( + pstCfg.getFileIOFactory(), igniteCfg.getMemoryConfiguration()); + + FilePageStore idxStore = pageStoreFactory.createPageStore(PageMemory.FLAG_IDX, idxFile); FilePageStore[] partStores = new FilePageStore[grpDesc.config().getAffinity().partitions()]; for (int partId = 0; partId < partStores.length; partId++) { - FilePageStore partStore = new FilePageStore( - PageMemory.FLAG_DATA, - new File(cacheWorkDir, String.format(PART_FILE_TEMPLATE, partId)), - pstCfg.getFileIOFactory(), - cctx.kernalContext().config().getMemoryConfiguration() - ); + FilePageStore partStore = pageStoreFactory.createPageStore( + PageMemory.FLAG_DATA, new File(cacheWorkDir, String.format(PART_FILE_TEMPLATE, partId))); partStores[partId] = partStore; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java new file mode 100644 index 0000000000000..5d044ec6b60e5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java @@ -0,0 +1,53 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.processors.cache.persistence.file; + +import java.io.File; +import org.apache.ignite.configuration.MemoryConfiguration; + +/** + * + */ +public class FilePageStoreV2 extends FilePageStore { + /** File version. */ + public static final int VERSION = 2; + + /** Header size. */ + private final int hdrSize; + + /** + * @param type Type. + * @param file File. + * @param factory Factory. + * @param cfg Config. + */ + public FilePageStoreV2(byte type, File file, FileIOFactory factory, MemoryConfiguration cfg) { + super(type, file, factory, cfg); + + hdrSize = cfg.getPageSize(); + } + + /** {@inheritDoc} */ + @Override public int headerSize() { + return hdrSize; + } + + /** {@inheritDoc} */ + @Override public int version() { + return VERSION; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java new file mode 100644 index 0000000000000..53bd802c77aae --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java @@ -0,0 +1,116 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.ignite.internal.processors.cache.persistence.file; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.MemoryConfiguration; + +/** + * Checks version in files if it's present on the disk, creates store with latest version otherwise. + */ +public class FileVersionCheckingFactory implements FilePageStoreFactory { + /** Property to override latest version. Should be used only in tests. */ + public static final String LATEST_VERSION_OVERRIDE_PROPERTY = "file.page.store.latest.version.override"; + + /** Latest page store version. */ + public final static int LATEST_VERSION = 2; + + /** Factory to provide I/O interfaces for read/write operations with files. */ + private final FileIOFactory fileIOFactory; + + /** Memory configuration. */ + private final MemoryConfiguration memCfg; + + /** + * @param fileIOFactory File io factory. + * @param memCfg Memory configuration. + */ + public FileVersionCheckingFactory( + FileIOFactory fileIOFactory, MemoryConfiguration memCfg) { + this.fileIOFactory = fileIOFactory; + this.memCfg = memCfg; + } + + /** {@inheritDoc} */ + @Override public FilePageStore createPageStore(byte type, File file) throws IgniteCheckedException { + if (!file.exists()) + return createPageStore(type, file, latestVersion()); + + try (FileIO fileIO = fileIOFactory.create(file, "r")) { + int minHdr = FilePageStore.HEADER_SIZE; + + if (fileIO.size() < minHdr) + return createPageStore(type, file, latestVersion()); + + ByteBuffer hdr = ByteBuffer.allocate(minHdr).order(ByteOrder.LITTLE_ENDIAN); + + while (hdr.remaining() > 0) + fileIO.read(hdr); + + hdr.rewind(); + + hdr.getLong(); // Read signature + + int ver = hdr.getInt(); + + return createPageStore(type, file, ver); + } + catch (IOException e) { + throw new IgniteCheckedException("Error while creating file page store [file=" + file + "]:", e); + } + } + + /** + * Resolves latest page store version. + */ + public int latestVersion() { + int latestVer = LATEST_VERSION; + + try { + latestVer = Integer.parseInt(System.getProperty(LATEST_VERSION_OVERRIDE_PROPERTY)); + } catch (NumberFormatException e) { + // No override. + } + + return latestVer; + } + + /** + * Instantiates specific version of FilePageStore. + * + * @param type Type. + * @param file File. + * @param ver Version. + */ + public FilePageStore createPageStore(byte type, File file, int ver) throws IgniteCheckedException { + switch (ver) { + case FilePageStore.VERSION: + return new FilePageStore(type, file, fileIOFactory, memCfg); + + case FilePageStoreV2.VERSION: + return new FilePageStoreV2(type, file, fileIOFactory, memCfg); + + default: + throw new IllegalArgumentException("Unknown version of file page store: " + ver); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java index c248c35d19020..11d5eef9926ba 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java @@ -194,7 +194,7 @@ private void eraseDataFromDisk( long size = fileIO.size(); - fileIO.write(ByteBuffer.allocate((int)size - FilePageStore.HEADER_SIZE), FilePageStore.HEADER_SIZE); + fileIO.write(ByteBuffer.allocate((int)size - filePageStore.headerSize()), filePageStore.headerSize()); fileIO.force(); } From b96659f54c43bed96f8b1e6c9ceb67a6dbebc0ac Mon Sep 17 00:00:00 2001 From: Alexey Goncharuk Date: Thu, 10 Aug 2017 18:13:17 +0300 Subject: [PATCH 2/8] IGNITE-5961 - Fixed pages() method for FilePageStore (cherry picked from commit 7199037) --- .../processors/cache/persistence/file/FilePageStore.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java index 7fe1ffe9ad104..e6c5379a4b5b9 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java @@ -535,6 +535,6 @@ private long allocPage() { if (!inited) return 0; - return (int)(allocated.get() / pageSize); + return (int)(allocated.get() - headerSize()) / pageSize; } } From 1808393b1c870b0c089cc495f070fcb75202d65d Mon Sep 17 00:00:00 2001 From: Ivan Rakov Date: Thu, 17 Aug 2017 15:54:21 +0300 Subject: [PATCH 3/8] IGNITE-6033 Added sorted and multithreaded modes in checkpointing algorithm - Fixes #2441. Signed-off-by: Alexey Goncharuk (cherry picked from commit 69e6f8b) --- .../configuration/CheckpointWriteOrder.java | 33 ++++++++ .../PersistentStoreConfiguration.java | 26 ++++++ .../GridCacheDatabaseSharedManager.java | 82 ++++++++++++++----- ...tePersistenceSequentialCheckpointTest.java | 44 ++++++++++ .../IgnitePersistentStoreCacheGroupsTest.java | 31 ++++--- 5 files changed, 183 insertions(+), 33 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java b/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java new file mode 100644 index 0000000000000..31feaf6f88d5c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java @@ -0,0 +1,33 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.configuration; + +/** + * This enum defines order of writing pages to disk storage during checkpoint. + */ +public enum CheckpointWriteOrder { + /** + * Pages are written in order provided by checkpoint pages collection iterator (which is basically a hashtable). + */ + RANDOM, + + /** + * All checkpoint pages are collected into single list and sorted by page index. + * Provides almost sequential disk writes, which can be much faster on some SSD models. + */ + SEQUENTIAL +} diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java index e8a0ff47e7015..5b902ac1e3adc 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java @@ -47,6 +47,9 @@ public class PersistentStoreConfiguration implements Serializable { /** Default number of checkpointing threads. */ public static final int DFLT_CHECKPOINTING_THREADS = 1; + /** Default checkpoint write order. */ + public static final CheckpointWriteOrder DFLT_CHECKPOINT_WRITE_ORDER = CheckpointWriteOrder.RANDOM; + /** Default number of checkpoints to be kept in WAL after checkpoint is finished */ public static final int DFLT_WAL_HISTORY_SIZE = 20; @@ -95,6 +98,9 @@ public class PersistentStoreConfiguration implements Serializable { /** */ private int checkpointingThreads = DFLT_CHECKPOINTING_THREADS; + /** Checkpoint write order. */ + private CheckpointWriteOrder checkpointWriteOrder = DFLT_CHECKPOINT_WRITE_ORDER; + /** Number of checkpoints to keep */ private int walHistSize = DFLT_WAL_HISTORY_SIZE; @@ -587,6 +593,26 @@ public long getWalAutoArchiveAfterInactivity() { return walAutoArchiveAfterInactivity; } + /** + * This property defines order of writing pages to disk storage during checkpoint. + * + * @return Checkpoint write order. + */ + public CheckpointWriteOrder getCheckpointWriteOrder() { + return checkpointWriteOrder; + } + + /** + * This property defines order of writing pages to disk storage during checkpoint. + * + * @param checkpointWriteOrder Checkpoint write order. + */ + public PersistentStoreConfiguration setCheckpointWriteOrder(CheckpointWriteOrder checkpointWriteOrder) { + this.checkpointWriteOrder = checkpointWriteOrder; + + return this; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(PersistentStoreConfiguration.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index d147f3699cf0d..741ab9f8459ce 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -49,7 +49,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -65,6 +64,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.PersistenceMetrics; +import org.apache.ignite.configuration.CheckpointWriteOrder; import org.apache.ignite.configuration.DataPageEvictionMode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.MemoryConfiguration; @@ -96,7 +96,6 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -137,6 +136,7 @@ import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.mxbean.PersistenceMetricsMXBean; import org.apache.ignite.thread.IgniteThread; +import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -384,11 +384,12 @@ private void initDataBase() { long cpBufSize = persistenceCfg.getCheckpointingPageBufferSize(); if (persistenceCfg.getCheckpointingThreads() > 1) - asyncRunner = new ThreadPoolExecutor( + asyncRunner = new IgniteThreadPoolExecutor( + "checkpoint-runner", + cctx.igniteInstanceName(), persistenceCfg.getCheckpointingThreads(), persistenceCfg.getCheckpointingThreads(), - 30L, - TimeUnit.SECONDS, + 30_000, new LinkedBlockingQueue() ); @@ -2082,10 +2083,10 @@ private Checkpoint markCheckpointBegin(CheckpointMetricsTracker tracker) throws WALPointer cpPtr = null; - GridMultiCollectionWrapper cpPages; - final CheckpointProgress curr; + IgniteBiTuple>, Integer> cpPagesTuple; + tracker.onLockWaitStart(); checkpointLock.writeLock().lock(); @@ -2150,19 +2151,9 @@ private Checkpoint markCheckpointBegin(CheckpointMetricsTracker tracker) throws if (curr.nextSnapshot) snapshotMgr.onMarkCheckPointBegin(curr.snapshotOperation, map); - IgniteBiTuple>, Integer> tup = beginAllCheckpoints(); + cpPagesTuple = beginAllCheckpoints(); - // Todo it maybe more optimally - Collection cpPagesList = new ArrayList<>(tup.get2()); - - for (GridMultiCollectionWrapper col : tup.get1()) { - for (int i = 0; i < col.collectionsSize(); i++) - cpPagesList.addAll(col.innerCollection(i)); - } - - cpPages = new GridMultiCollectionWrapper<>(cpPagesList); - - if (!F.isEmpty(cpPages)) { + if (!F.isEmpty(cpPagesTuple.get1())) { // No page updates for this checkpoint are allowed from now on. cpPtr = cctx.wal().log(cpRec); @@ -2178,7 +2169,7 @@ private Checkpoint markCheckpointBegin(CheckpointMetricsTracker tracker) throws curr.cpBeginFut.onDone(); - if (!F.isEmpty(cpPages)) { + if (!F.isEmpty(cpPagesTuple.get1())) { assert cpPtr != null; // Sync log outside the checkpoint write lock. @@ -2196,6 +2187,8 @@ private Checkpoint markCheckpointBegin(CheckpointMetricsTracker tracker) throws checkpointHist.addCheckpointEntry(cpEntry); + GridMultiCollectionWrapper cpPages = splitAndSortCpPagesIfNeeded(cpPagesTuple); + if (printCheckpointStats) if (log.isInfoEnabled()) log.info(String.format("Checkpoint started [checkpointId=%s, startPtr=%s, checkpointLockWait=%dms, " + @@ -2293,6 +2286,55 @@ public void shutdownNow() { } } + /** + * Reorders list of checkpoint pages and splits them into needed number of sublists according to + * {@link PersistentStoreConfiguration#getCheckpointingThreads()} and + * {@link PersistentStoreConfiguration#getCheckpointWriteOrder()}. + * + * @param cpPagesTuple Checkpoint pages tuple. + */ + private GridMultiCollectionWrapper splitAndSortCpPagesIfNeeded( + IgniteBiTuple>, Integer> cpPagesTuple) { + List cpPagesList = new ArrayList<>(cpPagesTuple.get2()); + + for (GridMultiCollectionWrapper col : cpPagesTuple.get1()) { + for (int i = 0; i < col.collectionsSize(); i++) + cpPagesList.addAll(col.innerCollection(i)); + } + + if (persistenceCfg.getCheckpointWriteOrder() == CheckpointWriteOrder.SEQUENTIAL) { + Collections.sort(cpPagesList, new Comparator() { + @Override public int compare(FullPageId o1, FullPageId o2) { + int cmp = Long.compare(o1.groupId(), o2.groupId()); + if (cmp != 0) + return cmp; + + return Long.compare(PageIdUtils.effectivePageId(o1.pageId()), + PageIdUtils.effectivePageId(o2.pageId())); + } + }); + } + + int cpThreads = persistenceCfg.getCheckpointingThreads(); + + int pagesSubLists = cpThreads == 1 ? 1 : cpThreads * 4; + // Splitting pages to (threads * 4) subtasks. If any thread will be faster, it will help slower threads. + + Collection[] pagesSubListArr = new Collection[pagesSubLists]; + + for (int i = 0; i < pagesSubLists; i++) { + int totalSize = cpPagesList.size(); + + int from = totalSize * i / (pagesSubLists); + + int to = totalSize * (i + 1) / (pagesSubLists); + + pagesSubListArr[i] = cpPagesList.subList(from, to); + } + + return new GridMultiCollectionWrapper(pagesSubListArr); + } + /** Pages write task */ private class WriteCheckpointPages implements Runnable { /** */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java new file mode 100644 index 0000000000000..92950002d5f77 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java @@ -0,0 +1,44 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.processors.cache.persistence; + +import org.apache.ignite.configuration.CheckpointWriteOrder; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.PersistentStoreConfiguration; +import org.apache.ignite.configuration.WALMode; + +/** + * + */ +public class IgnitePersistenceSequentialCheckpointTest extends IgnitePersistentStoreCacheGroupsTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration() + .setWalMode(WALMode.LOG_ONLY) + .setCheckpointingThreads(4) + .setCheckpointWriteOrder(CheckpointWriteOrder.SEQUENTIAL)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected int entriesCount() { + return 1000; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java index a945c73ed347f..b39b8cb6b6f9b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java @@ -87,7 +87,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest MemoryConfiguration memCfg = new MemoryConfiguration(); memCfg.setPageSize(1024); - memCfg.setDefaultMemoryPolicySize(10 * 1024 * 1024); + memCfg.setDefaultMemoryPolicySize(100 * 1024 * 1024); cfg.setMemoryConfiguration(memCfg); @@ -115,6 +115,11 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest super.afterTest(); } + /** Entries count. */ + protected int entriesCount() { + return 10; + } + /** * @throws Exception If failed. */ @@ -236,7 +241,7 @@ public void _testExpiryPolicy() throws Exception { for (String cacheName : caches) { IgniteCache cache = node.cache(cacheName).withExpiryPolicy(plc); - for (int i = 0; i < 10; i++) + for (int i = 0; i < entriesCount(); i++) cache.put(i, cacheName + i); } @@ -253,10 +258,10 @@ public void _testExpiryPolicy() throws Exception { for (String cacheName : caches) { IgniteCache cache = node.cache(cacheName); - for (int i = 0; i < 10; i++) + for (int i = 0; i < entriesCount(); i++) assertEquals(cacheName + i, cache.get(i)); - assertEquals(10, cache.size()); + assertEquals(entriesCount(), cache.size()); } // Wait for expiration. @@ -340,7 +345,7 @@ private void putPersons(String[] caches, Ignite node) { for (String cacheName : caches) { IgniteCache cache = node.cache(cacheName); - for (int i = 0; i < 10; i++) + for (int i = 0; i < entriesCount(); i++) cache.put(i, new Person("" + i, cacheName)); } } @@ -353,10 +358,10 @@ private void checkPersons(String[] caches, Ignite node) { for (String cacheName : caches) { IgniteCache cache = node.cache(cacheName); - for (int i = 0; i < 10; i++) + for (int i = 0; i < entriesCount(); i++) assertEquals(new Person("" + i, cacheName), cache.get(i)); - assertEquals(10, cache.size()); + assertEquals(entriesCount(), cache.size()); } } @@ -373,10 +378,10 @@ private void checkPersonsQuery(String[] caches, Ignite node) { List> persons = cache.query(qry.setArgs(cacheName)).getAll(); - for (int i = 0; i < 10; i++) + for (int i = 0; i < entriesCount(); i++) assertEquals(new Person("" + i, cacheName), persons.get(i).getValue()); - assertEquals(10, persons.size()); + assertEquals(entriesCount(), persons.size()); } } @@ -413,13 +418,13 @@ private void clusterRestart(int nodes, boolean staticCaches) throws Exception { for (String cacheName : caches) { IgniteCache cache = node.cache(cacheName); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < entriesCount(); i++) { cache.put(i, cacheName + i); assertEquals(cacheName + i, cache.get(i)); } - assertEquals(10, cache.size()); + assertEquals(entriesCount(), cache.size()); } stopAllGrids(); @@ -433,10 +438,10 @@ private void clusterRestart(int nodes, boolean staticCaches) throws Exception { for (String cacheName : caches) { IgniteCache cache = node.cache(cacheName); - for (int i = 0; i < 10; i++) + for (int i = 0; i < entriesCount(); i++) assertEquals(cacheName + i, cache.get(i)); - assertEquals(10, cache.size()); + assertEquals(entriesCount(), cache.size()); } } From 6423eb535e791f7c0b81ac2dddcbf5b3a73b9f11 Mon Sep 17 00:00:00 2001 From: Dmitriy Govorukhin Date: Tue, 22 Aug 2017 16:34:31 +0300 Subject: [PATCH 4/8] IGNITE-6154 fix incorrect check checkpoint pages (cherry picked from commit 474ecb8) --- .../GridCacheDatabaseSharedManager.java | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 741ab9f8459ce..b10ebdb09fbcd 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -2169,7 +2169,7 @@ private Checkpoint markCheckpointBegin(CheckpointMetricsTracker tracker) throws curr.cpBeginFut.onDone(); - if (!F.isEmpty(cpPagesTuple.get1())) { + if (hasPageForWrite(cpPagesTuple.get1())) { assert cpPtr != null; // Sync log outside the checkpoint write lock. @@ -2219,6 +2219,24 @@ private Checkpoint markCheckpointBegin(CheckpointMetricsTracker tracker) throws } } + /** + * Check that at least one collection is not empty. + * + * @param cpPagesCollWrapper Collection of {@link GridMultiCollectionWrapper} checkpoint pages. + */ + private boolean hasPageForWrite(Collection> cpPagesCollWrapper) { + boolean hasPages = false; + + for (Collection c : cpPagesCollWrapper) + if (!c.isEmpty()) { + hasPages = true; + + break; + } + + return hasPages; + } + /** * @return tuple with collections of FullPageIds obtained from each PageMemory and overall number of dirty pages. */ @@ -2294,7 +2312,8 @@ public void shutdownNow() { * @param cpPagesTuple Checkpoint pages tuple. */ private GridMultiCollectionWrapper splitAndSortCpPagesIfNeeded( - IgniteBiTuple>, Integer> cpPagesTuple) { + IgniteBiTuple>, Integer> cpPagesTuple + ) { List cpPagesList = new ArrayList<>(cpPagesTuple.get2()); for (GridMultiCollectionWrapper col : cpPagesTuple.get1()) { From 7b9777eae0a821571b412ef923c77d129b92189e Mon Sep 17 00:00:00 2001 From: Ivan Rakov Date: Tue, 22 Aug 2017 17:03:42 +0300 Subject: [PATCH 5/8] IGNITE-6154 also fixed check for WAL record (cherry picked from commit fa42218) --- .../cache/persistence/GridCacheDatabaseSharedManager.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index b10ebdb09fbcd..eac0b91f71930 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -2089,6 +2089,8 @@ private Checkpoint markCheckpointBegin(CheckpointMetricsTracker tracker) throws tracker.onLockWaitStart(); + boolean hasPages; + checkpointLock.writeLock().lock(); try { @@ -2153,7 +2155,9 @@ private Checkpoint markCheckpointBegin(CheckpointMetricsTracker tracker) throws cpPagesTuple = beginAllCheckpoints(); - if (!F.isEmpty(cpPagesTuple.get1())) { + hasPages = hasPageForWrite(cpPagesTuple.get1()); + + if (hasPages) { // No page updates for this checkpoint are allowed from now on. cpPtr = cctx.wal().log(cpRec); @@ -2169,7 +2173,7 @@ private Checkpoint markCheckpointBegin(CheckpointMetricsTracker tracker) throws curr.cpBeginFut.onDone(); - if (hasPageForWrite(cpPagesTuple.get1())) { + if (hasPages) { assert cpPtr != null; // Sync log outside the checkpoint write lock. From fd0aa3b7453f9cd8e3451ddd0c2d14cde586e793 Mon Sep 17 00:00:00 2001 From: Ivan Rakov Date: Thu, 24 Aug 2017 18:18:31 +0300 Subject: [PATCH 6/8] IGNITE-6178 Make CheckpointWriteOrder.SEQUENTIAL and checkpointingThreads=4 default in persistent store confguration (cherry picked from commit 6f279b0) --- .../ignite/configuration/PersistentStoreConfiguration.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java index 5b902ac1e3adc..888bf42c7e3b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java @@ -16,12 +16,11 @@ */ package org.apache.ignite.configuration; +import java.io.Serializable; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; import org.apache.ignite.internal.util.typedef.internal.S; -import java.io.Serializable; - /** * Configures Apache Ignite Persistent store. */ @@ -45,10 +44,10 @@ public class PersistentStoreConfiguration implements Serializable { public static final int DFLT_RATE_TIME_INTERVAL_MILLIS = 60_000; /** Default number of checkpointing threads. */ - public static final int DFLT_CHECKPOINTING_THREADS = 1; + public static final int DFLT_CHECKPOINTING_THREADS = 4; /** Default checkpoint write order. */ - public static final CheckpointWriteOrder DFLT_CHECKPOINT_WRITE_ORDER = CheckpointWriteOrder.RANDOM; + public static final CheckpointWriteOrder DFLT_CHECKPOINT_WRITE_ORDER = CheckpointWriteOrder.SEQUENTIAL; /** Default number of checkpoints to be kept in WAL after checkpoint is finished */ public static final int DFLT_WAL_HISTORY_SIZE = 20; From 958948d973962b7bb121fff79d5d200a7074874e Mon Sep 17 00:00:00 2001 From: Ivan Rakov Date: Fri, 25 Aug 2017 20:09:43 +0300 Subject: [PATCH 7/8] IGNITE-6183 Make "Node crashed in the middle of checkpoint" message softer and more informative Signed-off-by: Andrey Gura (cherry picked from commit a1fab62) --- .../cache/persistence/GridCacheDatabaseSharedManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index eac0b91f71930..8bf0c4b70f757 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -1302,8 +1302,8 @@ private WALPointer restoreMemory(CheckpointStatus status) throws IgniteCheckedEx boolean apply = status.needRestoreMemory(); if (apply) { - U.quietAndWarn(log, "Ignite node crashed in the middle of checkpoint. Will restore memory state and " + - "enforce checkpoint on node start."); + U.quietAndWarn(log, "Ignite node stopped in the middle of checkpoint. Will restore memory state and " + + "finish checkpoint on node start."); cctx.pageStore().beginRecover(); } From 56a3acc905fda1176ff94bb0f5c499d278a99155 Mon Sep 17 00:00:00 2001 From: Ivan Rakov Date: Mon, 28 Aug 2017 18:16:11 +0300 Subject: [PATCH 8/8] IGNITE-6204 CE fix --- .../cache/persistence/GridCacheDatabaseSharedManager.java | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 8bf0c4b70f757..351ec7101ee58 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -96,6 +96,7 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheContext;