Skip to content

Commit

Permalink
GG-24734 Append additional cp tracking activity - pages sort.
Browse files Browse the repository at this point in the history
  • Loading branch information
vmalin committed Oct 25, 2019
1 parent f773e5c commit 2f62050
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 13 deletions.
Expand Up @@ -122,6 +122,13 @@ public long pageId() {
return pageId;
}

/**
* @return Effective page ID.
*/
public long effectivePageId() {
return effectivePageId;
}

/**
* @return Cache group ID.
*/
Expand Down
Expand Up @@ -97,7 +97,6 @@
import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
Expand Down Expand Up @@ -3698,9 +3697,18 @@ private void waitCompleted() throws IgniteCheckedException {
@SuppressWarnings("NakedNotify")
public class Checkpointer extends GridWorker {
/** Checkpoint started log message format. */
private static final String CHECKPOINT_STARTED_LOG_FORMAT = "Checkpoint started [checkpointId=%s, startPtr=%s," +
" checkpointBeforeLockTime=%dms, checkpointLockWait=%dms, checkpointListenersExecuteTime=%dms, " +
"checkpointLockHoldTime=%dms, walCpRecordFsyncDuration=%dms, %s pages=%d, reason='%s']";
private static final String CHECKPOINT_STARTED_LOG_FORMAT = "Checkpoint started [" +
"checkpointId=%s, " +
"startPtr=%s, " +
"checkpointBeforeLockTime=%dms, " +
"checkpointLockWait=%dms, " +
"checkpointListenersExecuteTime=%dms, " +
"checkpointLockHoldTime=%dms, " +
"walCpRecordFsyncDuration=%dms, " +
"writeCheckpointEntryDuration=%dms, " +
"splitAndSortCpPagesDuration=%dms, " +
"%s pages=%d, " +
"reason='%s']";

/** Temporary write buffer. */
private final ByteBuffer tmpWriteBuf;
Expand Down Expand Up @@ -4374,9 +4382,13 @@ private Checkpoint markCheckpointBegin(CheckpointMetricsTracker tracker) throws

curr.transitTo(MARKER_STORED_TO_DISK);

tracker.onSplitAndSortCpPagesStart();

GridMultiCollectionWrapper<FullPageId> cpPages = splitAndSortCpPagesIfNeeded(
cpPagesTuple, persistenceCfg.getCheckpointThreads());

tracker.onSplitAndSortCpPagesEnd();

if (printCheckpointStats && log.isInfoEnabled()) {
long possibleJvmPauseDur = possibleLongJvmPauseDuration(tracker);

Expand All @@ -4391,6 +4403,8 @@ private Checkpoint markCheckpointBegin(CheckpointMetricsTracker tracker) throws
tracker.listenersExecuteDuration(),
tracker.lockHoldDuration(),
tracker.walCpRecordFsyncDuration(),
tracker.writeCheckpointEntryDuration(),
tracker.splitAndSortCpPagesDuration(),
possibleJvmPauseDur > 0 ? "possibleJvmPauseDuration=" + possibleJvmPauseDur + "ms," : "",
cpPages.size(),
curr.reason
Expand Down Expand Up @@ -4848,8 +4862,7 @@ private GridMultiCollectionWrapper<FullPageId> splitAndSortCpPagesIfNeeded(
if (cmp != 0)
return cmp;

return Long.compare(PageIdUtils.effectivePageId(o1.pageId()),
PageIdUtils.effectivePageId(o2.pageId()));
return Long.compare(o1.effectivePageId(), o2.effectivePageId());
}
};

Expand Down
Expand Up @@ -16,8 +16,12 @@

package org.apache.ignite.internal.processors.cache.persistence.pagemem;

import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntryType;

/**
* Tracks various checkpoint phases and stats.
Expand Down Expand Up @@ -75,6 +79,12 @@ public class CheckpointMetricsTracker {
/** */
private long walCpRecordFsyncEnd;

/** */
private long splitAndSortCpPagesStart;

/** */
private long splitAndSortCpPagesEnd;

/** */
private long listenersExecEnd;

Expand Down Expand Up @@ -162,6 +172,20 @@ public void onWalCpRecordFsyncStart() {
walCpRecordFsyncStart = System.currentTimeMillis();
}

/**
*
*/
public void onSplitAndSortCpPagesStart() {
splitAndSortCpPagesStart = System.currentTimeMillis();
}

/**
*
*/
public void onSplitAndSortCpPagesEnd() {
splitAndSortCpPagesEnd = System.currentTimeMillis();
}

/**
*
*/
Expand Down Expand Up @@ -232,6 +256,22 @@ public long walCpRecordFsyncDuration() {
return walCpRecordFsyncEnd - walCpRecordFsyncStart;
}

/**
* @return Duration of checkpoint entry buffer writing to file.
*
* @see GridCacheDatabaseSharedManager#writeCheckpointEntry(ByteBuffer, CheckpointEntry, CheckpointEntryType)
*/
public long writeCheckpointEntryDuration() {
return splitAndSortCpPagesStart - walCpRecordFsyncEnd;
}

/**
* @return Duration of splitting and sorting checkpoint pages.
*/
public long splitAndSortCpPagesDuration() {
return splitAndSortCpPagesEnd - splitAndSortCpPagesStart;
}

/**
* @return Checkpoint start time.
*/
Expand Down
Expand Up @@ -1253,7 +1253,7 @@ private boolean isThrottlingEnabled() {
relPtr = refreshOutdatedPage(
seg,
fullId.groupId(),
PageIdUtils.effectivePageId(fullId.pageId()),
fullId.effectivePageId(),
true
);

Expand Down Expand Up @@ -1413,7 +1413,7 @@ private int generationTag(Segment seg, FullPageId fullId) {
private long resolveRelativePointer(Segment seg, FullPageId fullId, int reqVer) {
return seg.loadedPages.get(
fullId.groupId(),
PageIdUtils.effectivePageId(fullId.pageId()),
fullId.effectivePageId(),
reqVer,
INVALID_REL_PTR,
OUTDATED_REL_PTR
Expand Down Expand Up @@ -1543,7 +1543,7 @@ public long acquiredPages() {
*/
public boolean hasLoadedPage(FullPageId fullPageId) {
int grpId = fullPageId.groupId();
long pageId = PageIdUtils.effectivePageId(fullPageId.pageId());
long pageId = fullPageId.effectivePageId();
int partId = PageIdUtils.partId(pageId);

Segment seg = segment(grpId, pageId);
Expand Down Expand Up @@ -2517,7 +2517,7 @@ else if (dirtyAddr != INVALID_REL_PTR)

loadedPages.remove(
fullPageId.groupId(),
PageIdUtils.effectivePageId(fullPageId.pageId())
fullPageId.effectivePageId()
);

return relRmvAddr;
Expand Down Expand Up @@ -2590,7 +2590,7 @@ private long tryToFindSequentially(int cap, PageStoreWriter saveDirtyPage) throw
if (preparePageRemoval(fullPageId, absEvictAddr, saveDirtyPage)) {
loadedPages.remove(
fullPageId.groupId(),
PageIdUtils.effectivePageId(fullPageId.pageId())
fullPageId.effectivePageId()
);

return addr;
Expand Down
Expand Up @@ -233,11 +233,11 @@ private void runDeadlockScenario() throws Exception {
pickedPages.sort(new Comparator<FullPageId>() {
@Override public int compare(FullPageId o1, FullPageId o2) {
int cmp = Long.compare(o1.groupId(), o2.groupId());

if (cmp != 0)
return cmp;

return Long.compare(PageIdUtils.effectivePageId(o1.pageId()),
PageIdUtils.effectivePageId(o2.pageId()));
return Long.compare(o1.effectivePageId(), o2.effectivePageId());
}
});

Expand Down
@@ -0,0 +1,100 @@
/*
* Copyright 2019 GridGain Systems, Inc. and Contributors.
*
* Licensed under the GridGain Community Edition License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.spi.checkpoint.cache;

import java.util.regex.Pattern;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

/**
* Checks correct checkpoint times logging
*/
public class CheckCheckpointStartLoggingTest extends GridCommonAbstractTest {
/** */
private static final String VALID_MS_PATTERN = "[0-9]*ms";

/** */
private static final String CHECKPOINT_STARTED_LOG_FORMAT = "Checkpoint started .*" +
"checkpointBeforeLockTime=" + VALID_MS_PATTERN + ", " +
"checkpointLockWait=" + VALID_MS_PATTERN + ", " +
"checkpointListenersExecuteTime="+ VALID_MS_PATTERN +", " +
"checkpointLockHoldTime="+ VALID_MS_PATTERN + ", " +
"walCpRecordFsyncDuration="+ VALID_MS_PATTERN +", " +
"writeCheckpointEntryDuration="+ VALID_MS_PATTERN +", " +
"splitAndSortCpPagesDuration="+ VALID_MS_PATTERN +", " +
".* pages=[1-9][0-9]*, " +
"reason=.*";

/** */
private ListeningTestLogger testLogger = new ListeningTestLogger(false, log);

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

DataStorageConfiguration memCfg = new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setMaxSize(10 * 1024 * 1024)
.setPersistenceEnabled(true));

cfg.setDataStorageConfiguration(memCfg);

cfg.setGridLogger(testLogger);

return cfg;
}

/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();

cleanPersistenceDir();

super.afterTest();
}

/**
* @throws Exception if failed.
*/
@Test
public void testCheckpointLogging() throws Exception {
LogListener lsnr = LogListener.matches(Pattern.compile(CHECKPOINT_STARTED_LOG_FORMAT)).build();

testLogger.registerListener(lsnr);

Ignite ignite = startGrid();

ignite.cluster().active(true);

IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(DEFAULT_CACHE_NAME);

for (int i = 0; i < 1000; i++)
cache.put(i, i);

forceCheckpoint();

assertTrue(lsnr.check());
}
}
Expand Up @@ -20,6 +20,7 @@
import org.apache.ignite.spi.checkpoint.cache.CacheCheckpointSpiSecondCacheSelfTest;
import org.apache.ignite.spi.checkpoint.cache.CacheCheckpointSpiSelfTest;
import org.apache.ignite.spi.checkpoint.cache.CacheCheckpointSpiStartStopSelfTest;
import org.apache.ignite.spi.checkpoint.cache.CheckCheckpointStartLoggingTest;
import org.apache.ignite.spi.checkpoint.jdbc.JdbcCheckpointSpiConfigSelfTest;
import org.apache.ignite.spi.checkpoint.jdbc.JdbcCheckpointSpiCustomConfigSelfTest;
import org.apache.ignite.spi.checkpoint.jdbc.JdbcCheckpointSpiDefaultConfigSelfTest;
Expand All @@ -40,6 +41,7 @@
CacheCheckpointSpiSelfTest.class,
CacheCheckpointSpiStartStopSelfTest.class,
CacheCheckpointSpiSecondCacheSelfTest.class,
CheckCheckpointStartLoggingTest.class,

// JDBC.
JdbcCheckpointSpiConfigSelfTest.class,
Expand Down

0 comments on commit 2f62050

Please sign in to comment.