Skip to content

Commit

Permalink
IGNITE-16560 [Native Persistence 3.0] PageMemoryImpl porting (#667)
Browse files Browse the repository at this point in the history
  • Loading branch information
tkalkirill committed Mar 2, 2022
1 parent c6731c3 commit f83ae45
Show file tree
Hide file tree
Showing 27 changed files with 4,895 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,23 @@ public static String toHexString(byte[] arr, int maxLen) {
return sb.toString().toUpperCase();
}

/**
* Returns hex representation of memory region.
*
* @param addr Pointer in memory.
* @param len How much byte to read.
*/
public static String toHexString(long addr, int len) {
StringBuilder sb = new StringBuilder(len * 2);

for (int i = 0; i < len; i++) {
// Can not use getLong because on little-endian it produces bs.
addByteAsHex(sb, GridUnsafe.getByte(addr + i));
}

return sb.toString();
}

/**
* Appends {@code byte} in hexadecimal format.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.pagememory.persistence;

import static org.apache.ignite.internal.configuration.ConfigurationTestUtils.fixConfiguration;
import static org.apache.ignite.internal.util.Constants.MiB;

import java.util.concurrent.TimeUnit;
import java.util.stream.LongStream;
import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionChange;
import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfiguration;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.TestPageIoRegistry;
import org.apache.ignite.internal.pagememory.mem.unsafe.UnsafeMemoryProvider;
import org.apache.ignite.internal.pagememory.tree.BplusTree;
import org.apache.ignite.internal.pagememory.tree.ItBplusTreeSelfTest;

/**
* Class to test the {@link BplusTree} with {@link PageMemoryImpl}.
*/
public class ItBplusTreePageMemoryImplTest extends ItBplusTreeSelfTest {
/** {@inheritDoc} */
@Override
protected PageMemory createPageMemory() throws Exception {
dataRegionCfg.change(c ->
c.convert(PageMemoryDataRegionChange.class)
.changePageSize(PAGE_SIZE)
.changeInitSize(MAX_MEMORY_SIZE)
.changeMaxSize(MAX_MEMORY_SIZE)
).get(1, TimeUnit.SECONDS);

long[] sizes = LongStream.range(0, CPUS + 1).map(i -> MAX_MEMORY_SIZE / CPUS).toArray();

sizes[CPUS] = 10 * MiB;

TestPageIoRegistry ioRegistry = new TestPageIoRegistry();

ioRegistry.loadFromServiceLoader();

return new PageMemoryImpl(
new UnsafeMemoryProvider(null),
(PageMemoryDataRegionConfiguration) fixConfiguration(dataRegionCfg),
ioRegistry,
sizes,
new TestPageReadWriteManager(),
(page, fullPageId, pageMemoryEx) -> {
}
);
}

/** {@inheritDoc} */
@Override
protected long acquiredPages() {
return ((PageMemoryImpl) pageMem).acquiredPages();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.pagememory.persistence;

import static org.apache.ignite.internal.configuration.ConfigurationTestUtils.fixConfiguration;
import static org.apache.ignite.internal.util.Constants.MiB;

import java.util.concurrent.TimeUnit;
import java.util.stream.LongStream;
import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionChange;
import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfiguration;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.TestPageIoRegistry;
import org.apache.ignite.internal.pagememory.mem.unsafe.UnsafeMemoryProvider;
import org.apache.ignite.internal.pagememory.tree.ItBplusTreeReuseSelfTest;

/**
* Test with reuse list and {@link PageMemoryImpl}.
*/
public class ItBplusTreeReuseListPageMemoryImplTest extends ItBplusTreeReuseSelfTest {
/** {@inheritDoc} */
@Override
protected PageMemory createPageMemory() throws Exception {
dataRegionCfg.change(c ->
c.convert(PageMemoryDataRegionChange.class)
.changePageSize(PAGE_SIZE)
.changeInitSize(MAX_MEMORY_SIZE)
.changeMaxSize(MAX_MEMORY_SIZE)
).get(1, TimeUnit.SECONDS);

long[] sizes = LongStream.range(0, CPUS + 1).map(i -> MAX_MEMORY_SIZE / CPUS).toArray();

sizes[CPUS] = 10 * MiB;

TestPageIoRegistry ioRegistry = new TestPageIoRegistry();

ioRegistry.loadFromServiceLoader();

return new PageMemoryImpl(
new UnsafeMemoryProvider(null),
(PageMemoryDataRegionConfiguration) fixConfiguration(dataRegionCfg),
ioRegistry,
sizes,
new TestPageReadWriteManager(),
(page, fullPageId, pageMemoryEx) -> {
}
);
}

/** {@inheritDoc} */
@Override
protected long acquiredPages() {
return ((PageMemoryImpl) pageMem).acquiredPages();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreaded;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreadedAsync;
import static org.apache.ignite.internal.util.Constants.MiB;
import static org.apache.ignite.internal.util.Constants.GiB;
import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -135,6 +135,10 @@ public class ItBplusTreeSelfTest extends BaseIgniteAbstractTest {

private static int RMV_INC = 1;

protected static final int PAGE_SIZE = 512;

protected static final long MAX_MEMORY_SIZE = GiB;

/** Forces printing lock/unlock events on the test tree. */
private static boolean PRINT_LOCKS = false;

Expand All @@ -146,7 +150,7 @@ public class ItBplusTreeSelfTest extends BaseIgniteAbstractTest {
PageMemoryDataRegionConfigurationSchema.class,
UnsafeMemoryAllocatorConfigurationSchema.class
})
private DataRegionConfiguration dataRegionCfg;
protected DataRegionConfiguration dataRegionCfg;

@Nullable
protected PageMemory pageMem;
Expand Down Expand Up @@ -2724,12 +2728,17 @@ public Long getLookupRow(BplusTree<Long, ?> tree, long pageAddr, int idx) {
}
}

private PageMemory createPageMemory() throws Exception {
/**
* Returns page memory.
*
* @throws Exception If failed.
*/
protected PageMemory createPageMemory() throws Exception {
dataRegionCfg.change(c ->
c.convert(PageMemoryDataRegionChange.class)
.changePageSize(512)
.changeInitSize(1024 * MiB)
.changeMaxSize(1024 * MiB)
.changePageSize(PAGE_SIZE)
.changeInitSize(MAX_MEMORY_SIZE)
.changeMaxSize(MAX_MEMORY_SIZE)
).get(1, TimeUnit.SECONDS);

TestPageIoRegistry ioRegistry = new TestPageIoRegistry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.ignite.internal.pagememory;

import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;

import org.apache.ignite.lang.IgniteInternalCheckedException;

/**
Expand Down Expand Up @@ -44,6 +46,9 @@ public interface PageIdAllocator {
*/
int INDEX_PARTITION = 0xFFFF;

/** Group meta page id. */
long META_PAGE_ID = pageId(INDEX_PARTITION, FLAG_AUX, 0);

/**
* Allocates a page from the space for the given partition ID and the given flags.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.pagememory.persistence;

import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapLong;
import static org.apache.ignite.internal.util.GridUnsafe.getLong;
import static org.apache.ignite.internal.util.GridUnsafe.putLong;
import static org.apache.ignite.internal.util.GridUnsafe.zeroMemory;

import java.util.function.LongUnaryOperator;

/**
* Clock page replacement algorithm implementation.
*/
public class ClockPageReplacementFlags {
/** Total pages count. */
private final int pagesCnt;

/** Index of the next candidate ("hand"). */
private int curIdx;

/** Pointer to memory region to store page hit flags. */
private final long flagsPtr;

/**
* Constructor.
*
* @param totalPagesCnt Total pages count.
* @param memPtr Pointer to memory region.
*/
ClockPageReplacementFlags(int totalPagesCnt, long memPtr) {
pagesCnt = totalPagesCnt;
flagsPtr = memPtr;

zeroMemory(flagsPtr, (totalPagesCnt + 7) >> 3);
}

/**
* Find page to replace.
*
* @return Page index to replace.
*/
public int poll() {
// This method is always executed under exclusive lock, no other synchronization or CAS required.
while (true) {
if (curIdx >= pagesCnt) {
curIdx = 0;
}

long ptr = flagsPtr + ((curIdx >> 3) & (~7L));

long flags = getLong(ptr);

if (((curIdx & 63) == 0) && (flags == ~0L)) {
putLong(ptr, 0L);

curIdx += 64;

continue;
}

long mask = ~0L << curIdx;

int bitIdx = Long.numberOfTrailingZeros(~flags & mask);

if (bitIdx == 64) {
putLong(ptr, flags & ~mask);

curIdx = (curIdx & ~63) + 64;
} else {
mask &= ~(~0L << bitIdx);

putLong(ptr, flags & ~mask);

curIdx = (curIdx & ~63) + bitIdx + 1;

if (curIdx <= pagesCnt) {
return curIdx - 1;
}
}
}
}

/**
* Get page hit flag.
*
* @param pageIdx Page index.
*/
boolean getFlag(int pageIdx) {
long flags = getLong(flagsPtr + ((pageIdx >> 3) & (~7L)));

return (flags & (1L << pageIdx)) != 0L;
}

/**
* Clear page hit flag.
*
* @param pageIdx Page index.
*/
public void clearFlag(int pageIdx) {
compareAndSwapFlag(pageIdx, flags -> flags & ~(1L << pageIdx));
}

/**
* Set page hit flag.
*
* @param pageIdx Page index.
*/
public void setFlag(int pageIdx) {
compareAndSwapFlag(pageIdx, flags -> flags | (1L << pageIdx));
}

/**
* CAS page hit flag value.
*
* @param pageIdx Page index.
* @param func Function to apply to flags.
*/
private void compareAndSwapFlag(int pageIdx, LongUnaryOperator func) {
long ptr = flagsPtr + ((pageIdx >> 3) & (~7L));

long oldFlags;
long newFlags;

do {
oldFlags = getLong(ptr);
newFlags = func.applyAsLong(oldFlags);

if (oldFlags == newFlags) {
return;
}
} while (!compareAndSwapLong(null, ptr, oldFlags, newFlags));
}

/**
* Memory required to service {@code pagesCnt} pages.
*
* @param pagesCnt Pages count.
*/
public static long requiredMemory(int pagesCnt) {
return ((pagesCnt + 63) / 8) & (~7L) /* 1 bit per page + 8 byte align */;
}
}
Loading

0 comments on commit f83ae45

Please sign in to comment.