Skip to content

Commit

Permalink
GG-33675 Speed-based throttling hasn't managed to protect the checkpo…
Browse files Browse the repository at this point in the history
…int buffer of a node (apache#2187)

- speed-based throttling had a bug that prevented it to protect CP Buffer when CP progress was not yet reported; it's fixed here
- speed-based throttling has been heavily refactored to make it easier to understand
- the machinery comprising and surrounding write throttling has been commented
  • Loading branch information
rpuch committed Nov 23, 2021
1 parent c2e944f commit 953902d
Show file tree
Hide file tree
Showing 17 changed files with 1,295 additions and 432 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ private GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> writePages(
pageMem.checkpointWritePage(fullId, tmpWriteBuf, pageStoreWriter, tracker);

if (throttlingEnabled) {
while (pageMem.shouldThrottle()) {
while (pageMem.isCpBufferOverflowThresholdExceeded()) {
FullPageId cpPageId = pageMem.pullPageFromCpBuffer();

if (cpPageId.equals(FullPageId.NULL_PAGE))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2021 GridGain Systems, Inc. and Contributors.
*
* Licensed under the GridGain Community Edition License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache.persistence.pagemem;

import static org.apache.ignite.internal.processors.cache.persistence.pagemem.PagesWriteThrottlePolicy.CP_BUF_FILL_THRESHOLD;

/**
* Logic used to determine whether Checkpoint Buffer is in danger zone and writer threads should be throttled.
*/
class CheckpointBufferOverflowWatchdog {
/** Page memory. */
private final PageMemoryImpl pageMemory;

/**
* Creates a new instance.
*
* @param pageMemory page memory to use
*/
CheckpointBufferOverflowWatchdog(PageMemoryImpl pageMemory) {
this.pageMemory = pageMemory;
}

/**
* Returns true if Checkpoint Buffer is in danger zone (more than
* {@link PagesWriteThrottlePolicy#CP_BUF_FILL_THRESHOLD} of the buffer is filled) and, hence, writer threads need
* to be throttled.
*
* @return {@code true} iff Checkpoint Buffer is in danger zone
*/
boolean isInDangerZone() {
int checkpointBufLimit = (int)(pageMemory.checkpointBufferPagesSize() * CP_BUF_FILL_THRESHOLD);

return pageMemory.checkpointBufferPagesCount() > checkpointBufLimit;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2021 GridGain Systems, Inc. and Contributors.
*
* Licensed under the GridGain Community Edition License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache.persistence.pagemem;

import java.util.concurrent.atomic.AtomicInteger;

/**
* Implements exponential backoff logic. Contains a counter and increments it on each {@link #nextDuration()}.
* May be reset using {@link #reset()}.
*/
class ExponentialBackoff {
/**
* Starting backoff duration.
*/
private final long startingBackoffNanos;

/**
* Backoff ratio. Each next duration will be this times longer.
*/
private final double backoffRatio;

/**
* Exponential backoff counter.
*/
private final AtomicInteger exponentialBackoffCounter = new AtomicInteger(0);

/**
* Constructs a new instance with the given parameters.
*
* @param startingBackoffNanos duration of first backoff in nanoseconds
* @param backoffRatio each next duration will be this times longer
*/
public ExponentialBackoff(long startingBackoffNanos, double backoffRatio) {
this.startingBackoffNanos = startingBackoffNanos;
this.backoffRatio = backoffRatio;
}

/**
* Returns next backoff duration (in nanoseconds). As a side effect, increments the backoff counter so that
* next call will return a longer duration.
*
* @return next backoff duration in nanoseconds
*/
public long nextDuration() {
int exponent = exponentialBackoffCounter.getAndIncrement();
return (long) (startingBackoffNanos * Math.pow(backoffRatio, exponent));
}

/**
* Resets the exponential backoff counter so that next call to {@link #nextDuration()}
* will return {@link #startingBackoffNanos}.
*
* @return {@code true} iff this backoff was not already in a reset state
*/
public boolean reset() {
int oldValue = exponentialBackoffCounter.getAndSet(0);
return oldValue != 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2021 GridGain Systems, Inc. and Contributors.
*
* Licensed under the GridGain Community Edition License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache.persistence.pagemem;

/**
* Logic used to protect memory (mainly, Checkpoint Buffer) from exhaustion using exponential backoff.
*/
class ExponentialBackoffThrottlingStrategy {
/**
* Starting throttle time. Limits write speed to 1000 MB/s.
*/
private static final long STARTING_THROTTLE_NANOS = 4000;

/**
* Backoff ratio. Each next park will be this times longer.
*/
private static final double BACKOFF_RATIO = 1.05;

/**
* Exponential backoff used to throttle threads.
*/
private final ExponentialBackoff backoff = new ExponentialBackoff(STARTING_THROTTLE_NANOS, BACKOFF_RATIO);

/**
* Computes next duration (in nanos) to throttle a thread to protect Checkpoint Buffer.
*
* @return park time in nanos
*/
long protectionParkTime() {
return backoff.nextDuration();
}

/**
* Resets the backoff counter. Invoked when no throttling is needed anymore.
*
* @return {@code true} iff the backoff was not already in a reset state
*/
boolean resetBackoff() {
return backoff.reset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ public long acquirePage(int grpId, long pageId, IoStatisticsHolder statHldr,
* @throws IgniteException If checkpoint has been already started and was not finished.
* @param allowToReplace The sign which allows to replace pages from a checkpoint by page replacer.
*/
public GridMultiCollectionWrapper<FullPageId> beginCheckpoint(IgniteInternalFuture allowToReplace) throws IgniteException;
public GridMultiCollectionWrapper<FullPageId> beginCheckpoint(IgniteInternalFuture allowToReplace)
throws IgniteException;

/**
* Finishes checkpoint operation.
Expand Down Expand Up @@ -177,9 +178,12 @@ public void checkpointWritePage(
public FullPageId pullPageFromCpBuffer();

/**
* Calculates throttling condition.
* Checks if the Checkpoint Buffer is currently close to exhaustion.
*
* @return {@code true} if measures like throttling to protect Checkpoint Buffer should be applied,
* and {@code false} otherwise.
*/
public boolean shouldThrottle();
public boolean isCpBufferOverflowThresholdExceeded();

/**
* Total pages can be placed to memory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,7 @@ private void releaseCheckpointBufferPage(long tmpBufPtr) {
int resCntr = checkpointPool.releaseFreePage(tmpBufPtr);

if (resCntr == checkpointBufferPagesSize() / 2 && writeThrottle != null)
writeThrottle.tryWakeupThrottledThreads();
writeThrottle.wakeupThrottledThreads();
}

/**
Expand Down Expand Up @@ -1897,8 +1897,8 @@ public static int segmentIndex(int grpId, long pageId, int segments) {
}

/** {@inheritDoc} */
@Override public boolean shouldThrottle() {
return writeThrottle.shouldThrottle();
@Override public boolean isCpBufferOverflowThresholdExceeded() {
return writeThrottle.isCpBufferOverflowThresholdExceeded();
}

/** {@inheritDoc} */
Expand Down
Loading

0 comments on commit 953902d

Please sign in to comment.