Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.util.GridUnsafe;
Expand Down Expand Up @@ -148,10 +147,6 @@ public int realPageSize() {
+ "],\nsuper = ["
+ super.toString() + "]]";
}
catch (IgniteCheckedException ignored) {
return "Error during call 'toString' of PageSnapshot [fullPageId=" + fullPageId() +
", pageData = " + Arrays.toString(pageData()) + ", super=" + super.toString() + "]";
}
finally {
GridUnsafe.cleanDirectBuffer(buf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2243,6 +2243,8 @@ else if (plc != PartitionLossPolicy.IGNORE) {
ctx.database().checkpointReadLock();

try {
Map<UUID, Set<Integer>> addToWaitGroups = new HashMap<>();

lock.writeLock().lock();

try {
Expand Down Expand Up @@ -2274,7 +2276,7 @@ else if (plc != PartitionLossPolicy.IGNORE) {

GridDhtPartitionState state = partMap.get(part);

if (state == null || state != OWNING)
if (state != OWNING)
continue;

if (!newOwners.contains(remoteNodeId)) {
Expand All @@ -2294,9 +2296,7 @@ else if (plc != PartitionLossPolicy.IGNORE) {
UUID nodeId = entry.getKey();
Set<Integer> rebalancedParts = entry.getValue();

// Add to wait groups to ensure late assignment switch after all partitions are rebalanced.
for (Integer part : rebalancedParts)
ctx.cache().context().affinity().addToWaitGroup(groupId(), part, nodeId, topologyVersionFuture().initialVersion());
addToWaitGroups.put(nodeId, new HashSet<>(rebalancedParts));

if (!rebalancedParts.isEmpty()) {
Set<Integer> historical = rebalancedParts.stream()
Expand All @@ -2315,9 +2315,22 @@ else if (plc != PartitionLossPolicy.IGNORE) {
}

node2part = new GridDhtPartitionFullMap(node2part, updateSeq.incrementAndGet());
} finally {
}
finally {
lock.writeLock().unlock();
}

for (Map.Entry<UUID, Set<Integer>> entry : addToWaitGroups.entrySet()) {
// Add to wait groups to ensure late assignment switch after all partitions are rebalanced.
for (Integer part : entry.getValue()) {
ctx.cache().context().affinity().addToWaitGroup(
groupId(),
part,
entry.getKey(),
topologyVersionFuture().initialVersion()
);
}
}
}
finally {
ctx.database().checkpointReadUnlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,10 @@
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.METASTORE_DATA_RECORD;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_FILE_MATCHER;
import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getType;
import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getVersion;
import static org.apache.ignite.internal.util.IgniteUtils.checkpointBufferSize;
import static org.apache.ignite.internal.util.IgniteUtils.hexLong;

/**
*
Expand Down Expand Up @@ -941,7 +944,7 @@ private void finishRecovery() throws IgniteCheckedException {

long time = System.currentTimeMillis();

checkpointReadLock();
CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() + 1);

try {
for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
Expand Down Expand Up @@ -973,7 +976,7 @@ private void finishRecovery() throws IgniteCheckedException {
throw e;
}
finally {
checkpointReadUnlock();
CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() - 1);
}
}

Expand Down Expand Up @@ -3059,53 +3062,49 @@ private void finalizeCheckpointOnRecovery(
int innerIdx = i;

exec.execute(stripeIdx, () -> {
PageStoreWriter pageStoreWriter = (fullPageId, buf, tag) -> {
assert tag != PageMemoryImpl.TRY_AGAIN_TAG : "Lock is held by other thread for page " + fullPageId;

int groupId = fullPageId.groupId();
long pageId = fullPageId.pageId();

// Write buf to page store.
PageStore store = storeMgr.writeInternal(groupId, pageId, buf, tag, true);

// Save store for future fsync.
updStores.add(store);
};

// Local buffer for write pages.
ByteBuffer writePageBuf = ByteBuffer.allocateDirect(pageSize());

writePageBuf.order(ByteOrder.nativeOrder());

Collection<FullPageId> pages0 = pages.innerCollection(innerIdx);

FullPageId pageId = null;
FullPageId fullPageId = null;

try {
for (FullPageId fullId : pages0) {
// Fail-fast break if some exception occurred.
if (writePagesError.get() != null)
break;

writePageBuf.rewind();
// Save pageId to local variable for future using if exception occurred.
fullPageId = fullId;

PageMemoryEx pageMem = getPageMemoryForCacheGroup(fullId.groupId());

// Write page content to writePageBuf.
Integer tag = pageMem.getForCheckpoint(fullId, writePageBuf, null);

assert tag == null || tag != PageMemoryImpl.TRY_AGAIN_TAG :
"Lock is held by other thread for page " + fullId;

if (tag != null) {
writePageBuf.rewind();

// Save pageId to local variable for future using if exception occurred.
pageId = fullId;

// Write writePageBuf to page store.
PageStore store = storeMgr.writeInternal(
fullId.groupId(), fullId.pageId(), writePageBuf, tag, true);

writePageBuf.rewind();

// Save store for future fsync.
updStores.add(store);
}
// Write page content to page store via pageStoreWriter.
// Tracker is null, because no need to track checkpoint metrics on recovery.
pageMem.checkpointWritePage(fullId, writePageBuf, pageStoreWriter, null);
}

// Add number of handled pages.
cpPagesCnt.addAndGet(pages0.size());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to write page to pageStore, pageId=" + pageId);
U.error(log, "Failed to write page to pageStore, pageId=" + fullPageId);

writePagesError.compareAndSet(null, e);
}
Expand Down Expand Up @@ -4765,10 +4764,14 @@ private WriteCheckpointPages(
* @return pagesToRetry Pages which should be retried.
*/
private List<FullPageId> writePages(Collection<FullPageId> writePageIds) throws IgniteCheckedException {
ByteBuffer tmpWriteBuf = threadBuf.get();

List<FullPageId> pagesToRetry = new ArrayList<>();

CheckpointMetricsTracker tracker = persStoreMetrics.metricsEnabled() ? this.tracker : null;

PageStoreWriter pageStoreWriter = createPageStoreWriter(pagesToRetry);

ByteBuffer tmpWriteBuf = threadBuf.get();

for (FullPageId fullId : writePageIds) {
if (checkpointer.shutdownNow)
break;
Expand Down Expand Up @@ -4799,37 +4802,48 @@ else if (grpId == TxLog.TX_LOG_CACHE_ID)
pageMem = (PageMemoryEx)region.pageMemory();
}

Integer tag = pageMem.getForCheckpoint(
fullId, tmpWriteBuf, persStoreMetrics.metricsEnabled() ? tracker : null);
pageMem.checkpointWritePage(fullId, tmpWriteBuf, pageStoreWriter, tracker);
}

if (tag != null) {
return pagesToRetry;
}

/**
* Factory method for create {@link PageStoreWriter}.
*
* @param pagesToRetry List pages for retry.
* @return Checkpoint page write context.
*/
private PageStoreWriter createPageStoreWriter(List<FullPageId> pagesToRetry) {
return new PageStoreWriter() {
/** {@inheritDoc} */
@Override public void writePage(FullPageId fullPageId, ByteBuffer buf, int tag) throws IgniteCheckedException {
if (tag == PageMemoryImpl.TRY_AGAIN_TAG) {
pagesToRetry.add(fullId);
pagesToRetry.add(fullPageId);

continue;
return;
}

assert PageIO.getType(tmpWriteBuf) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(fullId.pageId());
assert PageIO.getVersion(tmpWriteBuf) != 0 : "Invalid state. Version is 0! pageId = " + U.hexLong(fullId.pageId());
int groupId = fullPageId.groupId();
long pageId = fullPageId.pageId();

tmpWriteBuf.rewind();
assert getType(buf) != 0 : "Invalid state. Type is 0! pageId = " + hexLong(pageId);
assert getVersion(buf) != 0 : "Invalid state. Version is 0! pageId = " + hexLong(pageId);

if (persStoreMetrics.metricsEnabled()) {
int pageType = PageIO.getType(tmpWriteBuf);
int pageType = getType(buf);

if (PageIO.isDataPageType(pageType))
tracker.onDataPageWritten();
}

writtenPagesCntr.incrementAndGet();

PageStore store = storeMgr.writeInternal(grpId, fullId.pageId(), tmpWriteBuf, tag, true);
PageStore store = storeMgr.writeInternal(groupId, pageId, buf, tag, true);

updStores.computeIfAbsent(store, k -> new LongAdder()).increment();
}
}

return pagesToRetry;
};
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache.persistence;

import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;

/**
* Interface for write page to {@link PageStore}.
*/
public interface PageStoreWriter {
/**
* Callback for write page. {@link PageMemoryEx} will copy page content to buffer before call.
*
* @param fullPageId Page ID to get byte buffer for. The page ID must be present in the collection returned by
* the {@link PageMemoryEx#beginCheckpoint()} method call.
* @param buf Temporary buffer to write changes into.
* @param tag {@code Partition generation} if data was read, {@code null} otherwise (data already saved to storage).
* @throws IgniteCheckedException If write page failed.
*/
void writePage(FullPageId fullPageId, ByteBuffer buf, int tag) throws IgniteCheckedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
import org.apache.ignite.internal.util.GridUnsafe;
import org.jetbrains.annotations.Nullable;

Expand All @@ -28,9 +29,9 @@
* content without holding segment lock. Page data is copied into temp buffer during {@link #writePage(FullPageId,
* ByteBuffer, int)} and then sent to real implementation by {@link #finishReplacement()}.
*/
public class DelayedDirtyPageWrite implements ReplacedPageWriter {
public class DelayedDirtyPageStoreWrite implements PageStoreWriter {
/** Real flush dirty page implementation. */
private final ReplacedPageWriter flushDirtyPage;
private final PageStoreWriter flushDirtyPage;

/** Page size. */
private final int pageSize;
Expand All @@ -56,9 +57,12 @@ public class DelayedDirtyPageWrite implements ReplacedPageWriter {
* @param pageSize page size.
* @param tracker tracker to lock/unlock page reads.
*/
public DelayedDirtyPageWrite(ReplacedPageWriter flushDirtyPage,
ThreadLocal<ByteBuffer> byteBufThreadLoc, int pageSize,
DelayedPageReplacementTracker tracker) {
public DelayedDirtyPageStoreWrite(
PageStoreWriter flushDirtyPage,
ThreadLocal<ByteBuffer> byteBufThreadLoc,
int pageSize,
DelayedPageReplacementTracker tracker
) {
this.flushDirtyPage = flushDirtyPage;
this.pageSize = pageSize;
this.byteBufThreadLoc = byteBufThreadLoc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;

/**
* Delayed page writes tracker. Provides delayed write implementations and allows to check if page is actually being
Expand All @@ -35,7 +36,7 @@ public class DelayedPageReplacementTracker {
private final int pageSize;

/** Flush dirty page real implementation. */
private final ReplacedPageWriter flushDirtyPage;
private final PageStoreWriter flushDirtyPage;

/** Logger. */
private final IgniteLogger log;
Expand All @@ -56,20 +57,24 @@ public class DelayedPageReplacementTracker {
};

/**
* Dirty page write for replacement operations thread local. Because page write {@link DelayedDirtyPageWrite} is
* Dirty page write for replacement operations thread local. Because page write {@link DelayedDirtyPageStoreWrite} is
* stateful and not thread safe, this thread local protects from GC pressure on pages replacement. <br> Map is used
* instead of build-in thread local to allow GC to remove delayed writers for alive threads after node stop.
*/
private final Map<Long, DelayedDirtyPageWrite> delayedPageWriteThreadLocMap = new ConcurrentHashMap<>();
private final Map<Long, DelayedDirtyPageStoreWrite> delayedPageWriteThreadLocMap = new ConcurrentHashMap<>();

/**
* @param pageSize Page size.
* @param flushDirtyPage Flush dirty page.
* @param log Logger.
* @param segmentCnt Segments count.
*/
public DelayedPageReplacementTracker(int pageSize, ReplacedPageWriter flushDirtyPage,
IgniteLogger log, int segmentCnt) {
public DelayedPageReplacementTracker(
int pageSize,
PageStoreWriter flushDirtyPage,
IgniteLogger log,
int segmentCnt
) {
this.pageSize = pageSize;
this.flushDirtyPage = flushDirtyPage;
this.log = log;
Expand All @@ -82,9 +87,9 @@ public DelayedPageReplacementTracker(int pageSize, ReplacedPageWriter flushDirty
/**
* @return delayed page write implementation, finish method to be called to actually write page.
*/
public DelayedDirtyPageWrite delayedPageWrite() {
public DelayedDirtyPageStoreWrite delayedPageWrite() {
return delayedPageWriteThreadLocMap.computeIfAbsent(Thread.currentThread().getId(),
id -> new DelayedDirtyPageWrite(flushDirtyPage, byteBufThreadLoc, pageSize, this));
id -> new DelayedDirtyPageStoreWrite(flushDirtyPage, byteBufThreadLoc, pageSize, this));
}

/**
Expand Down
Loading