Skip to content

Commit

Permalink
IGNITE-15246 Fix of an error when starting a node due to exceeding th…
Browse files Browse the repository at this point in the history
…e DataStorageConfiguration#getMaxWalArchiveSize (#9299)
  • Loading branch information
tkalkirill committed Aug 5, 2021
1 parent 8f2d1fa commit d015707
Show file tree
Hide file tree
Showing 11 changed files with 319 additions and 101 deletions.
Expand Up @@ -226,4 +226,9 @@ public WALIterator replay(
* @return Last written pointer.
*/
WALPointer lastWritePointer();

/**
* Start automatically releasing segments when reaching {@link DataStorageConfiguration#getMaxWalArchiveSize()}.
*/
void startAutoReleaseSegments();
}
Expand Up @@ -1966,6 +1966,8 @@ private CheckpointStatus readCheckpointStatus() throws IgniteCheckedException {

cctx.tm().clearUncommitedStates();

cctx.wal().startAutoReleaseSegments();

if (recoveryVerboseLogging && log.isInfoEnabled()) {
log.info("Partition states information after LOGICAL RECOVERY phase:");

Expand Down
Expand Up @@ -3154,7 +3154,7 @@ public long lastTruncatedSegment() {
*
* @return Size in bytes.
*/
public long totalSize(FileDescriptor... fileDescriptors) {
public static long totalSize(FileDescriptor... fileDescriptors) {
long len = 0;

for (FileDescriptor descriptor : fileDescriptors)
Expand Down Expand Up @@ -3576,4 +3576,9 @@ static long minWalArchiveSize(DataStorageConfiguration dsCfg) {
return max == UNLIMITED_WAL_ARCHIVE ? max : min != HALF_MAX_WAL_ARCHIVE_SIZE ? min :
percentage == -1 ? max / 2 : (long)(max * percentage);
}

/** {@inheritDoc} */
@Override public void startAutoReleaseSegments() {
segmentAware.startAutoReleaseSegments();
}
}
Expand Up @@ -22,6 +22,7 @@
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -50,6 +51,9 @@ class SegmentArchiveSizeStorage {
/** WAL archive size unlimited. */
private final boolean walArchiveUnlimited;

/** Automatically release segments. Guarded by {@code this}. */
private boolean autoRelease;

/**
* Segment sizes. Mapping: segment idx -> size in bytes. Guarded by {@code this}.
* {@code null} if {@link #walArchiveUnlimited} == {@code true}.
Expand Down Expand Up @@ -103,8 +107,7 @@ public SegmentArchiveSizeStorage(
* or negative (e.g. when it is removed from the archive).
*/
void changeSize(long idx, long sizeChange) {
long releaseIdx = -1;
int releaseCnt = 0;
T2<Long, Integer> forceReleaseSegments = null;

synchronized (this) {
walArchiveSize += sizeChange;
Expand All @@ -118,31 +121,14 @@ void changeSize(long idx, long sizeChange) {
}

if (sizeChange > 0) {
if (!walArchiveUnlimited && walArchiveSize >= maxWalArchiveSize) {
long size = 0;

for (Map.Entry<Long, Long> e : segmentSizes.entrySet()) {
releaseIdx = e.getKey();
releaseCnt++;

if (walArchiveSize - (size += e.getValue()) < minWalArchiveSize)
break;
}
}
forceReleaseSegments = calcForceReleaseSegments();

notifyAll();
}
}

if (releaseIdx != -1) {
if (log.isInfoEnabled()) {
log.info("Maximum size of the WAL archive exceeded, the segments will be forcibly released [" +
"maxWalArchiveSize=" + U.humanReadableByteCount(maxWalArchiveSize) + ", releasedSegmentCnt=" +
releaseCnt + ", lastReleasedSegmentIdx=" + releaseIdx + ']');
}

reservationStorage.forceRelease(releaseIdx);
}
if (forceReleaseSegments != null)
forceReleaseSegments(forceReleaseSegments.get1(), forceReleaseSegments.get2());
}

/**
Expand Down Expand Up @@ -214,4 +200,64 @@ synchronized long currentSize() {
}
}
}

/**
* Start automatically releasing segments when reaching {@link DataStorageConfiguration#getMaxWalArchiveSize()}.
*/
void startAutoReleaseSegments() {
if (!walArchiveUnlimited) {
T2<Long, Integer> forceReleaseSegments = null;

synchronized (this) {
autoRelease = true;

forceReleaseSegments = calcForceReleaseSegments();
}

if (forceReleaseSegments != null)
forceReleaseSegments(forceReleaseSegments.get1(), forceReleaseSegments.get2());
}
}

/**
* Calculation of the segments for which the forced release of the segments will be performed.
*
* @return Pair: Absolute segment index up (and including) to which the segments will be released, segment count.
*/
@Nullable private synchronized T2<Long, Integer> calcForceReleaseSegments() {
if (!walArchiveUnlimited && autoRelease && walArchiveSize >= maxWalArchiveSize) {
long releaseIdx = -1;
int releaseCnt = 0;

long size = 0;

for (Map.Entry<Long, Long> e : segmentSizes.entrySet()) {
releaseIdx = e.getKey();
releaseCnt++;

if (walArchiveSize - (size += e.getValue()) < minWalArchiveSize)
break;
}

return releaseIdx == -1 ? null : new T2<>(releaseIdx, releaseCnt);
}
else
return null;
}

/**
* Forces the release of reserved segments.
*
* @param absIdx Absolute segment index up (and including) to which the segments will be released.
* @param cnt Segment count.
*/
private void forceReleaseSegments(long absIdx, int cnt) {
if (log.isInfoEnabled()) {
log.info("Maximum size of the WAL archive exceeded, the segments will be forcibly released [" +
"maxWalArchiveSize=" + U.humanReadableByteCount(maxWalArchiveSize) + ", releasedSegmentCnt=" +
cnt + ", lastReleasedSegmentIdx=" + absIdx + ']');
}

reservationStorage.forceRelease(absIdx);
}
}
Expand Up @@ -384,4 +384,11 @@ public void lastCheckpointIdx(long absIdx) {
public long awaitAvailableTruncateArchive() throws IgniteInterruptedCheckedException {
return truncateStorage.awaitAvailableTruncate();
}

/**
* Start automatically releasing segments when reaching {@link DataStorageConfiguration#getMaxWalArchiveSize()}.
*/
public void startAutoReleaseSegments() {
archiveSizeStorage.startAutoReleaseSegments();
}
}
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.cache;

import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;

/**
* Base class for testing the release of segments when performing an operation.
*/
public abstract class AbstractReleaseSegmentTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();

stopAllGrids();
cleanPersistenceDir();
}

/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();

stopAllGrids();
cleanPersistenceDir();
}

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
.setFailureHandler(new StopNodeFailureHandler())
.setDataStorageConfiguration(
new DataStorageConfiguration()
.setWalSegmentSize((int)(2 * U.MB))
.setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))
).setCacheConfiguration(
new CacheConfiguration<>(DEFAULT_CACHE_NAME)
.setAffinity(new RendezvousAffinityFunction(false, 2))
.setBackups(1)
);
}

/**
* Populates the given cache and forces a new checkpoint every 100 updates.
*
* @param cache Cache.
* @param cnt Entry count.
* @param o Key offset.
* @throws Exception If failed.
*/
protected void populate(IgniteCache<Integer, ? super Object> cache, int cnt, int o) throws Exception {
for (int i = 0; i < cnt; i++) {
if (i % 100 == 0)
forceCheckpoint();

cache.put(i + o, new byte[64 * 1024]);
}
}

/**
* Releases WAL segment.
*
* @param n Node.
* @param reserved Reserved segment.
*/
protected void release(IgniteEx n, @Nullable WALPointer reserved) {
while (reserved != null && walMgr(n).reserved(reserved))
walMgr(n).release(reserved);
}

/**
* Returns an instance of {@link SegmentAware} for the given ignite node.
*
* @return Segment aware.
*/
protected SegmentAware segmentAware(IgniteEx n) {
return getFieldValue(walMgr(n), "segmentAware");
}
}
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.cache;

import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.junit.Test;

import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.totalSize;

/**
* Class for testing automatic release of segments.
*/
public class AutoReleaseSegmentSelfTest extends AbstractReleaseSegmentTest {
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

cfg.getDataStorageConfiguration()
.setWalSegmentSize((int)(2 * U.MB))
.setMaxWalArchiveSize(10 * U.MB);

return cfg;
}

/**
* Checking that if at the time of start the node, the {@link DataStorageConfiguration#getMaxWalArchiveSize()}
* is exceeded, then there will be no automatic release of segments due to which there will be an error in
* {@code GridCacheDatabaseSharedManager#applyLogicalUpdates}.
*
* @throws Exception If failed.
*/
@Test
public void testStartNodeWithExceedMaxWalArchiveSize() throws Exception {
IgniteEx n = startGrid(0);

n.cluster().state(ACTIVE);

forceCheckpoint();
enableCheckpoints(n, false);

int i = 0;

while (totalSize(walMgr(n).walArchiveFiles()) < 20 * U.MB)
n.cache(DEFAULT_CACHE_NAME).put(i++, new byte[(int)(64 * U.KB)]);

stopGrid(0);
startGrid(0);
}
}

0 comments on commit d015707

Please sign in to comment.