From cc8cc25a81dc98f02212dd6b499b5e030426d0dd Mon Sep 17 00:00:00 2001 From: Ivan Rakov Date: Thu, 21 Sep 2017 11:16:00 +0300 Subject: [PATCH 1/4] IGNITE-6334 Throttle writing threads during ongoing checkpoint with token bucket algorithm --- .../PersistentStoreConfiguration.java | 28 +- .../GridCacheDatabaseSharedManager.java | 43 ++- .../persistence/pagemem/PageMemoryImpl.java | 98 +++++- .../pagemem/PagesWriteThrottle.java | 104 ++++++ .../pagemem/BPlusTreePageMemoryImplTest.java | 4 +- .../BPlusTreeReuseListPageMemoryImplTest.java | 3 +- .../MetadataStoragePageMemoryImplTest.java | 4 +- .../pagemem/PageMemoryImplNoLoadTest.java | 4 +- .../pagemem/PageMemoryImplTest.java | 4 +- .../PagesWriteThrottleSandboxTest.java | 264 ++++++++++++++ .../pagemem/PagesWriteThrottleSmokeTest.java | 322 ++++++++++++++++++ .../ignite/testsuites/IgnitePdsTestSuite.java | 4 + 12 files changed, 858 insertions(+), 24 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java index 888bf42c7e3b6..298f1e0f5a6bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java @@ -82,6 +82,9 @@ public class PersistentStoreConfiguration implements Serializable { /** Default wal archive directory. */ public static final String DFLT_WAL_ARCHIVE_PATH = "db/wal/archive"; + /** Default write throttling enabled. */ + public static final boolean DFLT_WRITE_THROTTLING_ENABLED = false; + /** */ private String persistenceStorePath; @@ -157,6 +160,11 @@ public class PersistentStoreConfiguration implements Serializable { */ private long walAutoArchiveAfterInactivity = -1; + /** + * If true, threads that generate dirty pages too fast during ongoing checkpoint will be throttled. + */ + private boolean writeThrottlingEnabled = DFLT_WRITE_THROTTLING_ENABLED; + /** * Returns a path the root directory where the Persistent Store will persist data and indexes. */ @@ -236,7 +244,7 @@ public int getCheckpointingThreads() { /** * Sets a number of threads to use for the checkpointing purposes. * - * @param checkpointingThreads Number of checkpointing threads. One thread is used by default. + * @param checkpointingThreads Number of checkpointing threads. Four threads are used by default. * @return {@code this} for chaining. */ public PersistentStoreConfiguration setCheckpointingThreads(int checkpointingThreads) { @@ -397,6 +405,24 @@ public PersistentStoreConfiguration setMetricsEnabled(boolean metricsEnabled) { return this; } + /** + * Gets flag indicating whether write throttling is enabled. + */ + public boolean isWriteThrottlingEnabled() { + return writeThrottlingEnabled; + } + + /** + * Sets flag indicating whether write throttling is enabled. + * + * @param writeThrottlingEnabled Write throttling enabled flag. + */ + public PersistentStoreConfiguration setWriteThrottlingEnabled(boolean writeThrottlingEnabled) { + this.writeThrottlingEnabled = writeThrottlingEnabled; + + return this; + } + /** * Gets the length of the time interval for rate-based metrics. This interval defines a window over which * hits will be tracked. Default value is {@link #DFLT_RATE_TIME_INTERVAL_MILLIS}. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 5a772b5bd35f8..16099451deb9f 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -300,6 +300,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** */ private ObjectName persistenceMetricsMbeanName; + /** Counter for written checkpoint pages. Not null only if checkpoint is running. */ + private volatile AtomicInteger writtenPagesCntr = null; + + /** Number of pages in current checkpoint. */ + private volatile int currCheckpointPagesCnt; + /** * @param ctx Kernal context. */ @@ -698,7 +704,8 @@ private long[] calculateFragmentSizes(int concLvl, long cacheSize, long chpBufSi } }, this, - memMetrics + memMetrics, + persistenceCfg.isWriteThrottlingEnabled() ); memMetrics.pageMemory(pageMem); @@ -941,7 +948,7 @@ private void shutdownCheckpointer(boolean cancel) { } /** - * Gets the checkpoint read lock. While this lock is held, checkpoint thread will not acquiSnapshotWorkerre memory + * Gets the checkpoint read lock. While this lock is held, checkpoint thread will not acquireSnapshotWorker memory * state. */ @SuppressWarnings("LockAcquiredButNotSafelyReleased") @@ -1904,6 +1911,20 @@ private CheckpointEntry writeCheckpointEntry( } } + /** + * Counter for written checkpoint pages. Not null only if checkpoint is running. + */ + public AtomicInteger writtenPagesCounter() { + return writtenPagesCntr; + } + + /** + * @return Number of pages in current checkpoint. If checkpoint is not running, returns 0. + */ + public int currentCheckpointPagesCount() { + return currCheckpointPagesCnt; + } + /** * @param cpTs Checkpoint timestamp. * @param cpId Checkpoint ID. @@ -2037,6 +2058,10 @@ private void doCheckpoint() { Checkpoint chp = markCheckpointBegin(tracker); + currCheckpointPagesCnt = chp.pagesSize; + + writtenPagesCntr = new AtomicInteger(); + boolean interrupted = true; try { @@ -2048,7 +2073,7 @@ private void doCheckpoint() { asyncRunner == null ? 1 : chp.cpPages.collectionsSize()); tracker.onPagesWriteStart(); - final AtomicInteger writtenPagesCtr = new AtomicInteger(); + final int totalPagesToWriteCnt = chp.cpPages.size(); if (asyncRunner != null) { @@ -2058,7 +2083,6 @@ private void doCheckpoint() { chp.cpPages.innerCollection(i), updStores, doneWriteFut, - writtenPagesCtr, totalPagesToWriteCnt ); @@ -2077,7 +2101,6 @@ private void doCheckpoint() { chp.cpPages, updStores, doneWriteFut, - writtenPagesCtr, totalPagesToWriteCnt); write.run(); @@ -2401,6 +2424,10 @@ private void markCheckpointEnd(Checkpoint chp) throws IgniteCheckedException { chp.cpEntry.checkpointMark(), null, CheckpointEntryType.END); + + writtenPagesCntr = null; + + currCheckpointPagesCnt = 0; } checkpointHist.onCheckpointFinished(chp); @@ -2497,9 +2524,6 @@ private class WriteCheckpointPages implements Runnable { /** */ private CountDownFuture doneFut; - /** Counter for all written pages. May be shared between several workers */ - private AtomicInteger writtenPagesCntr; - /** Total pages to write, counter may be greater than {@link #writePageIds} size */ private final int totalPagesToWrite; @@ -2510,7 +2534,6 @@ private class WriteCheckpointPages implements Runnable { * @param writePageIds Collection of page IDs to write. * @param updStores * @param doneFut - * @param writtenPagesCntr all written pages counter, may be shared between several write tasks * @param totalPagesToWrite total pages to be written under this checkpoint */ private WriteCheckpointPages( @@ -2518,13 +2541,11 @@ private WriteCheckpointPages( final Collection writePageIds, final GridConcurrentHashSet updStores, final CountDownFuture doneFut, - @NotNull final AtomicInteger writtenPagesCntr, final int totalPagesToWrite) { this.tracker = tracker; this.writePageIds = writePageIds; this.updStores = updStores; this.doneFut = doneFut; - this.writtenPagesCntr = writtenPagesCntr; this.totalPagesToWrite = totalPagesToWrite; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java index dbb64f8c22a50..61c53aa13eb26 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java @@ -33,6 +33,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -56,6 +57,7 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.MemoryMetricsImpl; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIO; @@ -179,6 +181,9 @@ public class PageMemoryImpl implements PageMemoryEx { /** State checker. */ private final CheckpointLockStateChecker stateChecker; + /** Number of used pages in checkpoint buffer. */ + private final AtomicInteger cpBufPagesCntr = new AtomicInteger(0); + /** */ private ExecutorService asyncRunner = new ThreadPoolExecutor( 0, @@ -217,6 +222,12 @@ public class PageMemoryImpl implements PageMemoryEx { /** Flush dirty page closure. When possible, will be called by evictPage(). */ private final GridInClosure3X changeTracker; + /** Pages write throttle. */ + private PagesWriteThrottle writeThrottle; + + /** Write throttle enabled flag. */ + private boolean throttleEnabled; + /** */ private boolean pageEvictWarned; @@ -232,6 +243,7 @@ public class PageMemoryImpl implements PageMemoryEx { * @param pageSize Page size. * @param flushDirtyPage Callback invoked when a dirty page is evicted. * @param changeTracker Callback invoked to track changes in pages. + * @param throttleEnabled Write throttle enabled flag. */ public PageMemoryImpl( DirectMemoryProvider directMemoryProvider, @@ -241,7 +253,8 @@ public PageMemoryImpl( GridInClosure3X flushDirtyPage, GridInClosure3X changeTracker, CheckpointLockStateChecker stateChecker, - MemoryMetricsImpl memMetrics + MemoryMetricsImpl memMetrics, + boolean throttleEnabled ) { assert sharedCtx != null; @@ -253,6 +266,7 @@ public PageMemoryImpl( this.flushDirtyPage = flushDirtyPage; this.changeTracker = changeTracker; this.stateChecker = stateChecker; + this.throttleEnabled = throttleEnabled; storeMgr = sharedCtx.pageStore(); walMgr = sharedCtx.wal(); @@ -290,7 +304,7 @@ public PageMemoryImpl( DirectMemoryRegion cpReg = regions.get(regs - 1); - checkpointPool = new PagePool(regs - 1, cpReg); + checkpointPool = new PagePool(regs - 1, cpReg, cpBufPagesCntr); long checkpointBuf = cpReg.size(); @@ -305,12 +319,16 @@ public PageMemoryImpl( totalAllocated += reg.size(); - segments[i] = new Segment(i, regions.get(i), checkpointPool.pages() / segments.length); + boolean throttleEnabled = sharedCtx.gridConfig().getPersistentStoreConfiguration().isWriteThrottlingEnabled(); + + segments[i] = new Segment(i, regions.get(i), checkpointPool.pages() / segments.length, throttleEnabled); pages += segments[i].pages(); totalTblSize += segments[i].tableSize(); } + initWriteThrottle(); + if (log.isInfoEnabled()) log.info("Started page memory [memoryAllocated=" + U.readableSize(totalAllocated, false) + ", pages=" + pages + @@ -319,6 +337,21 @@ public PageMemoryImpl( ']'); } + /** + * + */ + private void initWriteThrottle() { + if (!(sharedCtx.database() instanceof GridCacheDatabaseSharedManager)) { + log.error("Write throttle can't start. Unexpected class of database manager: " + + sharedCtx.database().getClass()); + + throttleEnabled = false; + } + + if (throttleEnabled) + writeThrottle = new PagesWriteThrottle(this, (GridCacheDatabaseSharedManager)sharedCtx.database()); + } + /** {@inheritDoc} */ @SuppressWarnings("OverlyStrongTypeCast") @Override public void stop() throws IgniteException { @@ -774,6 +807,18 @@ private void tryToRestorePage(FullPageId fullId, long absPtr) throws IgniteCheck return true; } + /** + * @param dirtyRatioThreshold Throttle threshold. + */ + boolean shouldThrottle(double dirtyRatioThreshold) { + for (Segment segment : segments) { + if (segment.shouldThrottle(dirtyRatioThreshold)) + return true; + } + + return false; + } + /** {@inheritDoc} */ @Override public GridMultiCollectionWrapper beginCheckpoint() throws IgniteException { Collection[] collections = new Collection[segments.length]; @@ -799,6 +844,9 @@ private void tryToRestorePage(FullPageId fullId, long absPtr) throws IgniteCheck @Override public void finishCheckpoint() { for (Segment seg : segments) seg.segCheckpointPages = null; + + if (throttleEnabled) + writeThrottle.onFinishCheckpoint(); } /** {@inheritDoc} */ @@ -1219,6 +1267,9 @@ private void writeUnlockPage( try { rwLock.writeUnlock(page + PAGE_LOCK_OFFSET, PageIdUtils.tag(pageId)); + + if (throttleEnabled && !restore && markDirty && !dirty) + writeThrottle.onMarkDirty(isInCheckpoint(fullId)); } catch (AssertionError ex) { StringBuilder sb = new StringBuilder(sysPageSize * 2); @@ -1309,6 +1360,20 @@ public int activePagesCount() { return total; } + /** + * Number of used pages in checkpoint buffer. + */ + public int checkpointBufferPagesCount() { + return cpBufPagesCntr.get(); + } + + /** + * Number of used pages in checkpoint buffer. + */ + public int checkpointBufferPagesSize() { + return checkpointPool.pages(); + } + /** * This method must be called in synchronized context. * @@ -1385,6 +1450,9 @@ private class PagePool { /** Direct memory region. */ protected final DirectMemoryRegion region; + /** Pool pages counter. */ + protected final AtomicInteger pagesCntr; + /** */ protected long lastAllocatedIdxPtr; @@ -1397,10 +1465,12 @@ private class PagePool { /** * @param idx Index. * @param region Region + * @param pagesCntr Pages counter. */ - protected PagePool(int idx, DirectMemoryRegion region) { + protected PagePool(int idx, DirectMemoryRegion region, AtomicInteger pagesCntr) { this.idx = idx; this.region = region; + this.pagesCntr = pagesCntr; long base = (region.address() + 7) & ~0x7; @@ -1427,6 +1497,9 @@ protected PagePool(int idx, DirectMemoryRegion region) { * @throws GridOffHeapOutOfMemoryException If failed to allocate new free page. */ private long borrowOrAllocateFreePage(long pageId) throws GridOffHeapOutOfMemoryException { + if (pagesCntr != null) + pagesCntr.getAndIncrement(); + long relPtr = borrowFreePage(); return relPtr != INVALID_REL_PTR ? relPtr : allocateFreePage(pageId); @@ -1500,6 +1573,9 @@ private void releaseFreePage(long relPtr) { assert !PageHeader.isAcquired(absPtr) : "Release pinned page: " + PageHeader.fullPageId(absPtr); + if (pagesCntr != null) + pagesCntr.getAndDecrement(); + while (true) { long freePageRelPtrMasked = GridUnsafe.getLong(freePageListPtr); @@ -1580,8 +1656,9 @@ private class Segment extends ReentrantReadWriteLock { /** * @param region Memory region. + * @param throttlingEnabled Write throttling enabled flag. */ - private Segment(int idx, DirectMemoryRegion region, int cpPoolPages) { + private Segment(int idx, DirectMemoryRegion region, int cpPoolPages, boolean throttlingEnabled) { long totalMemory = region.size(); int pages = (int)(totalMemory / sysPageSize); @@ -1596,9 +1673,9 @@ private Segment(int idx, DirectMemoryRegion region, int cpPoolPages) { DirectMemoryRegion poolRegion = region.slice(memPerTbl + 8); - pool = new PagePool(idx, poolRegion); + pool = new PagePool(idx, poolRegion, null); - maxDirtyPages = Math.min(pool.pages() * 2 / 3, cpPoolPages); + maxDirtyPages = throttlingEnabled ? pool.pages() * 3 / 4 : Math.min(pool.pages() * 2 / 3, cpPoolPages); } /** @@ -1608,6 +1685,13 @@ private boolean safeToUpdate() { return dirtyPages.size() < maxDirtyPages; } + /** + * @param dirtyRatioThreshold Throttle threshold. + */ + private boolean shouldThrottle(double dirtyRatioThreshold) { + return ((double)dirtyPages.size()) / pages() > dirtyRatioThreshold; + } + /** * @return Max number of pages this segment can allocate. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java new file mode 100644 index 0000000000000..d0c67c76e2e23 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java @@ -0,0 +1,104 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; + +/** + * Throttles threads that generate dirty pages during ongoing checkpoint. + * Designed to avoid zero dropdowns that can happen if checkpoint buffer is overflowed. + */ +public class PagesWriteThrottle { + /** Page memory. */ + private final PageMemoryImpl pageMemory; + + /** Database manager. */ + private final GridCacheDatabaseSharedManager dbSharedMgr; + + /** Starting throttle time. Limits write speed to 1000 MB/s. */ + private static final long STARTING_THROTTLE_NANOS = 4000; + + /** Backoff ratio. Each next park will be this times longer. */ + private static final double BACKOFF_RATIO = 1.05; + + /** Exponential backoff counter. */ + private final AtomicInteger exponentialBackoffCntr = new AtomicInteger(0); + /** + * @param pageMemory Page memory. + * @param dbSharedMgr Database manager. + */ + public PagesWriteThrottle(PageMemoryImpl pageMemory, GridCacheDatabaseSharedManager dbSharedMgr) { + this.pageMemory = pageMemory; + this.dbSharedMgr = dbSharedMgr; + } + + /** + * + */ + public void onMarkDirty(boolean isInCheckpoint) { + assert dbSharedMgr.checkpointLockIsHeldByThread(); + + AtomicInteger writtenPagesCntr = dbSharedMgr.writtenPagesCounter(); + + if (writtenPagesCntr == null) + return; // Don't throttle if checkpoint is not running. + + boolean shouldThrottle = false; + + if (isInCheckpoint) { + int checkpointBufLimit = pageMemory.checkpointBufferPagesSize() * 2 / 3; + + shouldThrottle = pageMemory.checkpointBufferPagesCount() > checkpointBufLimit; + } + + if (!shouldThrottle) { + int cpWrittenPages = writtenPagesCntr.get(); + + int cpTotalPages = dbSharedMgr.currentCheckpointPagesCount(); + + if (cpWrittenPages == cpTotalPages) { + // Checkpoint is already in fsync stage, increasing maximum ratio of dirty pages to 3/4 + shouldThrottle = pageMemory.shouldThrottle(3.0 / 4); + } else { + double dirtyRatioThreshold = ((double)cpWrittenPages) / cpTotalPages; + + // Starting with 0.05 to avoid throttle right after checkpoint start + // 7/12 is maximum ratio of dirty pages + dirtyRatioThreshold = (dirtyRatioThreshold * 0.95 + 0.05) * 7 / 12; + + shouldThrottle = pageMemory.shouldThrottle(dirtyRatioThreshold); + } + } + + if (shouldThrottle) { + int throttleLevel = exponentialBackoffCntr.getAndIncrement(); + + LockSupport.parkNanos((long)(STARTING_THROTTLE_NANOS * Math.pow(BACKOFF_RATIO, throttleLevel))); + } + else + exponentialBackoffCntr.set(0); + } + + /** + * + */ + public void onFinishCheckpoint() { + exponentialBackoffCntr.set(0); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java index 6f58782f5197a..56d09f8b0f399 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java @@ -82,7 +82,9 @@ public class BPlusTreePageMemoryImplTest extends BPlusTreeSelfTest { return true; } }, - new MemoryMetricsImpl(new MemoryPolicyConfiguration())); + new MemoryMetricsImpl(new MemoryPolicyConfiguration()), + false + ); mem.start(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java index b263d4f81ff17..39183b2b76480 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java @@ -82,7 +82,8 @@ public class BPlusTreeReuseListPageMemoryImplTest extends BPlusTreeReuseSelfTest return true; } }, - new MemoryMetricsImpl(new MemoryPolicyConfiguration()) + new MemoryMetricsImpl(new MemoryPolicyConfiguration()), + false ); mem.start(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java index d9257bd3c444b..a427c63fa1ecb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java @@ -97,6 +97,8 @@ public class MetadataStoragePageMemoryImplTest extends MetadataStorageSelfTest{ return true; } }, - new MemoryMetricsImpl(new MemoryPolicyConfiguration())); + new MemoryMetricsImpl(new MemoryPolicyConfiguration()), + false + ); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java index 1fff1f023ee80..467ede44d1cdf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java @@ -88,7 +88,9 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest { return true; } }, - new MemoryMetricsImpl(new MemoryPolicyConfiguration())); + new MemoryMetricsImpl(new MemoryPolicyConfiguration()), + false + ); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java index 0366eca6a263a..c5997fa0dfe73 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java @@ -110,7 +110,9 @@ private PageMemoryImpl createPageMemory() throws Exception { return true; } }, - new MemoryMetricsImpl(new MemoryPolicyConfiguration())); + new MemoryMetricsImpl(new MemoryPolicyConfiguration()), + false + ); mem.start(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java new file mode 100644 index 0000000000000..0b44cb6f1ed91 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java @@ -0,0 +1,264 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import java.io.Serializable; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.MemoryMetrics; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.configuration.MemoryPolicyConfiguration; +import org.apache.ignite.configuration.PersistentStoreConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.ratemetrics.HitRateMetrics; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Test to visualize and debug {@link PagesWriteThrottle}. + * Prints puts/gets rate, number of dirty pages, pages written in current checkpoint and pages in checkpoint buffer. + * Not intended to be part of any test suite. + */ +public class PagesWriteThrottleSandboxTest extends GridCommonAbstractTest { + /** Ip finder. */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Cache name. */ + private static final String CACHE_NAME = "cache1"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoverySpi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); + discoverySpi.setIpFinder(ipFinder); + + MemoryConfiguration dbCfg = new MemoryConfiguration(); + + dbCfg.setMemoryPolicies(new MemoryPolicyConfiguration() + .setMaxSize(4000L * 1024 * 1024) + .setName("dfltMemPlc") + .setMetricsEnabled(true)); + + dbCfg.setDefaultMemoryPolicyName("dfltMemPlc"); + + cfg.setMemoryConfiguration(dbCfg); + + CacheConfiguration ccfg1 = new CacheConfiguration(); + + ccfg1.setName(CACHE_NAME); + ccfg1.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + ccfg1.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + ccfg1.setAffinity(new RendezvousAffinityFunction(false, 64)); + + cfg.setCacheConfiguration(ccfg1); + + cfg.setPersistentStoreConfiguration( + new PersistentStoreConfiguration() + .setWalMode(WALMode.BACKGROUND) + .setCheckpointingFrequency(20_000) + .setCheckpointingPageBufferSize(1000L * 1000 * 1000) + .setWriteThrottlingEnabled(true)); + + cfg.setConsistentId(gridName); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + deleteWorkFiles(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + deleteWorkFiles(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 100 * 60 * 1000; + } + + /** + * @throws Exception if failed. + */ + public void testThrottle() throws Exception { + startGrids(1).active(true); + + try { + Ignite ig = ignite(0); + + final int keyCnt = 4_000_000; + + final AtomicBoolean run = new AtomicBoolean(true); + + HitRateMetrics getRate = new HitRateMetrics(5000, 5); + + GridTestUtils.runMultiThreadedAsync(new Callable() { + @Override public Object call() throws Exception { + while (run.get()) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int key = rnd.nextInt(keyCnt * 2); + + ignite(0).cache(CACHE_NAME).get(key); + + getRate.onHit(); + } + + return null; + } + }, 2, "read-loader"); + + HitRateMetrics putRate = new HitRateMetrics(1000, 5); + + GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + while (run.get()) { + long dirtyPages = 0; + + for (MemoryMetrics m : ig.memoryMetrics()) + if (m.getName().equals("dfltMemPlc")) + dirtyPages = m.getDirtyPages(); + + long cpBufPages = 0; + + long cpWrittenPages; + + AtomicInteger cntr = ((GridCacheDatabaseSharedManager)(((IgniteEx)ignite(0)) + .context().cache().context().database())).writtenPagesCounter(); + + cpWrittenPages = cntr == null ? 0 : cntr.get(); + + try { + cpBufPages = ((PageMemoryImpl)((IgniteEx)ignite(0)).context().cache().context().database() + .memoryPolicy("dfltMemPlc").pageMemory()).checkpointBufferPagesCount(); + } + catch (IgniteCheckedException e) { + e.printStackTrace(); + } + + System.out.println("@@@ putsPerSec=," + (putRate.getRate()) + ", getsPerSec=," + (getRate.getRate()) + ", dirtyPages=," + dirtyPages + ", cpWrittenPages=," + cpWrittenPages +", cpBufPages=," + cpBufPages); + + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + }, "metrics-view"); + + try (IgniteDataStreamer ds = ig.dataStreamer(CACHE_NAME)) { + ds.allowOverwrite(true); + + for (int i = 0; i < keyCnt * 10; i++) { + ds.addData(ThreadLocalRandom.current().nextInt(keyCnt), new TestValue(ThreadLocalRandom.current().nextInt(), + ThreadLocalRandom.current().nextInt())); + + putRate.onHit(); + } + } + + run.set(false); + } + finally { + stopAllGrids(); + } + } + + /** + * + */ + private static class TestValue implements Serializable { + /** */ + private final int v1; + + /** */ + private final int v2; + + /** */ + private byte[] payload = new byte[400 + ThreadLocalRandom.current().nextInt(20)]; + + /** + * @param v1 Value 1. + * @param v2 Value 2. + */ + private TestValue(int v1, int v2) { + this.v1 = v1; + this.v2 = v2; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestValue val = (TestValue)o; + + return v1 == val.v1 && v2 == val.v2; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = v1; + + res = 31 * res + v2; + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestValue.class, this); + } + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void deleteWorkFiles() throws IgniteCheckedException { + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "snapshot", false)); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java new file mode 100644 index 0000000000000..772d390633406 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java @@ -0,0 +1,322 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.configuration.MemoryPolicyConfiguration; +import org.apache.ignite.configuration.PersistentStoreConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.processors.cache.ratemetrics.HitRateMetrics; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class PagesWriteThrottleSmokeTest extends GridCommonAbstractTest { + /** Ip finder. */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Slow checkpoint enabled. */ + private final AtomicBoolean slowCheckpointEnabled = new AtomicBoolean(true); + + /** Cache name. */ + private static final String CACHE_NAME = "cache1"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoverySpi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); + discoverySpi.setIpFinder(ipFinder); + + MemoryConfiguration dbCfg = new MemoryConfiguration(); + + dbCfg.setMemoryPolicies(new MemoryPolicyConfiguration() + .setMaxSize(400 * 1024 * 1024) + .setName("dfltMemPlc") + .setMetricsEnabled(true)); + + dbCfg.setDefaultMemoryPolicyName("dfltMemPlc"); + + cfg.setMemoryConfiguration(dbCfg); + + CacheConfiguration ccfg1 = new CacheConfiguration(); + + ccfg1.setName(CACHE_NAME); + ccfg1.setAtomicityMode(CacheAtomicityMode.ATOMIC); + ccfg1.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC); + ccfg1.setAffinity(new RendezvousAffinityFunction(false, 64)); + + cfg.setCacheConfiguration(ccfg1); + + cfg.setPersistentStoreConfiguration( + new PersistentStoreConfiguration() + .setWalMode(WALMode.BACKGROUND) + .setCheckpointingFrequency(20_000) + .setCheckpointingPageBufferSize(200 * 1000 * 1000) + .setWriteThrottlingEnabled(true) + .setCheckpointingThreads(1) + .setFileIOFactory(new SlowCheckpointFileIOFactory())); + + cfg.setConsistentId(gridName); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + deleteWorkFiles(); + + slowCheckpointEnabled.set(true); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + deleteWorkFiles(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 6 * 60 * 1000; + } + + /** + * @throws Exception if failed. + */ + public void testThrottle() throws Exception { + startGrids(2).active(true); + + try { + Ignite ig = ignite(0); + + final int keyCnt = 2_000_000; + + final AtomicBoolean run = new AtomicBoolean(true); + + final AtomicBoolean zeroDropdown = new AtomicBoolean(false); + + final HitRateMetrics putRate10secs = new HitRateMetrics(10_000, 20); + + final HitRateMetrics putRate1sec = new HitRateMetrics(1_000, 20); + + GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + try { + Thread.sleep(5000); + + while (run.get()) { + System.out.println( + "Put rate over last 10 seconds: " + (putRate10secs.getRate() / 10) + + " puts/sec, over last 1 second: " + putRate1sec.getRate()); + + if (putRate10secs.getRate() == 0) { + zeroDropdown.set(true); + + run.set(false); + } + + Thread.sleep(1000); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + finally { + run.set(false); + } + } + }, "rate-checker"); + + final IgniteCache cache = ig.getOrCreateCache(CACHE_NAME); + + GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + long startTs = System.currentTimeMillis(); + + for (int i = 0; i < keyCnt * 10 && System.currentTimeMillis() - startTs < 3 * 60 * 1000; i++) { + if (!run.get()) + break; + + cache.put(ThreadLocalRandom.current().nextInt(keyCnt), new TestValue(ThreadLocalRandom.current().nextInt(), + ThreadLocalRandom.current().nextInt())); + + putRate10secs.onHit(); + + putRate1sec.onHit(); + } + + run.set(false); + } + }, "loader"); + + while (run.get()) + LockSupport.parkNanos(10_000); + + if (zeroDropdown.get()) { + slowCheckpointEnabled.set(false); + + IgniteInternalFuture cpFut1 = ((IgniteEx)ignite(0)).context().cache().context().database() + .wakeupForCheckpoint("test"); + + IgniteInternalFuture cpFut2 = ((IgniteEx)ignite(1)).context().cache().context().database() + .wakeupForCheckpoint("test"); + + cpFut1.get(); + + cpFut2.get(); + + fail("Put rate degraded to zero for at least 10 seconds"); + } + } + finally { + stopAllGrids(); + } + } + + /** + * + */ + private static class TestValue implements Serializable { + /** */ + private final int v1; + + /** */ + private final int v2; + + /** */ + private byte[] payload = new byte[400 + ThreadLocalRandom.current().nextInt(20)]; + + /** + * @param v1 Value 1. + * @param v2 Value 2. + */ + private TestValue(int v1, int v2) { + this.v1 = v1; + this.v2 = v2; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestValue val = (TestValue)o; + + return v1 == val.v1 && v2 == val.v2; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = v1; + + res = 31 * res + v2; + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestValue.class, this); + } + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void deleteWorkFiles() throws IgniteCheckedException { + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "snapshot", false)); + } + + /** + * Create File I/O that emulates poor checkpoint write speed. + */ + private class SlowCheckpointFileIOFactory implements FileIOFactory { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Delegate factory. */ + private final FileIOFactory delegateFactory = new RandomAccessFileIOFactory(); + + /** {@inheritDoc} */ + @Override public FileIO create(File file) throws IOException { + return create(file, "rw"); + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file, String mode) throws IOException { + FileIO delegate = delegateFactory.create(file, mode); + + return new FileIODecorator(delegate) { + @Override public int write(ByteBuffer srcBuf) throws IOException { + if (slowCheckpointEnabled.get() && Thread.currentThread().getName().contains("checkpoint")) + LockSupport.parkNanos(5_000_000); + + return delegate.write(srcBuf); + } + + @Override public int write(ByteBuffer srcBuf, long position) throws IOException { + if (slowCheckpointEnabled.get() && Thread.currentThread().getName().contains("checkpoint")) + LockSupport.parkNanos(5_000_000); + + return delegate.write(srcBuf, position); + } + + @Override public void write(byte[] buf, int off, int len) throws IOException { + if (slowCheckpointEnabled.get() && Thread.currentThread().getName().contains("checkpoint")) + LockSupport.parkNanos(5_000_000); + + delegate.write(buf, off, len); + } + }; + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java index b2a1f65410859..ef7682f37dfef 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.persistence.pagemem.MetadataStoragePageMemoryImplTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImplNoLoadTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImplTest; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.PagesWriteThrottleSmokeTest; import org.apache.ignite.internal.processors.database.IgniteDbClientNearCachePutGetTest; import org.apache.ignite.internal.processors.database.IgniteDbDynamicCacheSelfTest; import org.apache.ignite.internal.processors.database.IgniteDbMultiNodePutGetTest; @@ -80,6 +81,9 @@ public static TestSuite suite() throws Exception { suite.addTestSuite(DefaultPageSizeBackwardsCompatibilityTest.class); + // Write throttling + suite.addTestSuite(PagesWriteThrottleSmokeTest.class); + return suite; } } From 7ffa0a779d5b35462bb75071950bf5a802d76ab9 Mon Sep 17 00:00:00 2001 From: Ivan Rakov Date: Thu, 21 Sep 2017 11:18:01 +0300 Subject: [PATCH 2/4] IGNITE-6334 Throttle writing threads during ongoing checkpoint with token bucket algorithm --- .../ignite/configuration/PersistentStoreConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java index 298f1e0f5a6bb..585417225fbed 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java @@ -83,7 +83,7 @@ public class PersistentStoreConfiguration implements Serializable { public static final String DFLT_WAL_ARCHIVE_PATH = "db/wal/archive"; /** Default write throttling enabled. */ - public static final boolean DFLT_WRITE_THROTTLING_ENABLED = false; + public static final boolean DFLT_WRITE_THROTTLING_ENABLED = true; /** */ private String persistenceStorePath; From 7e6143ba926e572443270b974b05d9b324fbecf3 Mon Sep 17 00:00:00 2001 From: Ivan Rakov Date: Thu, 21 Sep 2017 15:42:24 +0300 Subject: [PATCH 3/4] IGNITE-6334 CE fix --- .../persistence/pagemem/PagesWriteThrottleSandboxTest.java | 4 ++-- .../persistence/pagemem/PagesWriteThrottleSmokeTest.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java index 0b44cb6f1ed91..e188ee4575b12 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java @@ -122,13 +122,13 @@ public void testThrottle() throws Exception { startGrids(1).active(true); try { - Ignite ig = ignite(0); + final Ignite ig = ignite(0); final int keyCnt = 4_000_000; final AtomicBoolean run = new AtomicBoolean(true); - HitRateMetrics getRate = new HitRateMetrics(5000, 5); + final HitRateMetrics getRate = new HitRateMetrics(5000, 5); GridTestUtils.runMultiThreadedAsync(new Callable() { @Override public Object call() throws Exception { diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java index 772d390633406..12a601d0512e1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java @@ -293,7 +293,7 @@ private class SlowCheckpointFileIOFactory implements FileIOFactory { /** {@inheritDoc} */ @Override public FileIO create(File file, String mode) throws IOException { - FileIO delegate = delegateFactory.create(file, mode); + final FileIO delegate = delegateFactory.create(file, mode); return new FileIODecorator(delegate) { @Override public int write(ByteBuffer srcBuf) throws IOException { From a957fd43d5d7217c9ff3dc8f19dfe77a013c0555 Mon Sep 17 00:00:00 2001 From: Ivan Rakov Date: Thu, 21 Sep 2017 16:41:34 +0300 Subject: [PATCH 4/4] IGNITE-6334 CE fix --- .../persistence/pagemem/PagesWriteThrottleSandboxTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java index e188ee4575b12..409ab8459cda1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java @@ -146,7 +146,7 @@ public void testThrottle() throws Exception { } }, 2, "read-loader"); - HitRateMetrics putRate = new HitRateMetrics(1000, 5); + final HitRateMetrics putRate = new HitRateMetrics(1000, 5); GridTestUtils.runAsync(new Runnable() { @Override public void run() {