Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bytes/issues/612 #614

Merged
merged 9 commits into from
Mar 21, 2024
44 changes: 30 additions & 14 deletions src/main/java/net/openhft/chronicle/bytes/MappedBytesStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class MappedBytesStore extends NativeBytesStore<Void> {
private final MappedFile mappedFile;
private final long start;
private final long safeLimit;
private final int pageSize;
private SyncMode syncMode = MappedFile.DEFAULT_SYNC_MODE;
private long syncLength = 0;

Expand Down Expand Up @@ -98,6 +99,7 @@ protected MappedBytesStore(ReferenceOwner owner, MappedFile mappedFile, @NonNega
: MappedBytesStore::readWriteOk;

reserveTransfer(INIT, owner);
this.pageSize = pageSize;
}

/**
Expand Down Expand Up @@ -438,23 +440,30 @@ public long appendUtf8(@NonNegative long pos, char[] chars, @NonNegative int off
@Override
protected void performRelease() {
if (address != 0 && syncMode != SyncMode.NONE && OS.isLinux()) {
performMsync(0, safeLimit - start);
performMsync(0, safeLimit - start, this.syncMode());
}
// must sync before releasing
super.performRelease();
}

private void performMsync(@NonNegative long offset, long length) {
final SyncMode syncMode = this.syncMode();
/**
* Sync the ByteStore if required.
*
* @param offset the offset within the ByteStore from the start to sync, offset must be a multiple of 4K
* @param length the length to sync, length must be a multiple of 4K
* @param syncMode the mode to sync
*/
private void performMsync(@NonNegative long offset, long length, SyncMode syncMode) {
if (syncMode == SyncMode.NONE)
return;
long start0 = System.currentTimeMillis();
boolean full = offset == 0;
int ret = PosixAPI.posix().msync(address + offset, length, syncMode.mSyncFlag());
if (ret != 0)
Jvm.error().on(MappedBytesStore.class, "msync failed, " + PosixAPI.posix().lastErrorStr() + ", ret=" + ret + " " + mappedFile.file() + " " + Long.toHexString(offset) + " " + Long.toHexString(length));
long time0 = System.currentTimeMillis() - start0;
if (time0 >= 200)
Jvm.perf().on(getClass(), "Took " + time0 / 1e3 + " seconds to " + syncMode + " " + mappedFile.file());
Jvm.perf().on(getClass(), "Took " + time0 + " ms to " + syncMode + " " + mappedFile.file() + (full ? " (full)" : ""));
}

/**
Expand All @@ -479,19 +488,26 @@ public void syncMode(SyncMode syncMode) {
* @param position to sync with the syncMode()
*/
public void syncUpTo(long position) {
if (syncMode == SyncMode.NONE || address == 0 || refCount() <= 0)
syncUpTo(position, this.syncMode);
}

/**
* Synchronise from the last complete page up to this position.
*
* @param position to sync with the syncMode()
* @param syncMode to use
*/
public void syncUpTo(long position, SyncMode syncMode) {
if (syncMode == SyncMode.NONE || address == 0 || refCount() <= 0 || !OS.isLinux())
return;
long length = position - start;
if (length <= syncLength)
long positionFromStart = Math.min(limit, position) - start;
if (positionFromStart <= syncLength)
return;
final long maxLength = safeLimit - start;
if (length > maxLength)
length = maxLength;
int mask = ~0xFFF;
long pageEnd = (length + 0xFFF) & mask;
int mask = - pageSize;
long pageEnd = (positionFromStart + pageSize - 1) & mask;
long syncStart = syncLength & mask;
final long length2 = pageEnd - syncStart;
performMsync(syncStart, length2);
syncLength = position;
performMsync(syncStart, length2, syncMode);
syncLength = positionFromStart;
}
}
119 changes: 119 additions & 0 deletions src/test/java/net/openhft/chronicle/bytes/MappedBytesStoreTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright (c) 2016-2022 chronicle.software
*
* https://chronicle.software
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.openhft.chronicle.bytes;

import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.core.io.ClosedIllegalStateException;
import net.openhft.chronicle.core.io.ReferenceOwner;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.io.IOException;

import static org.junit.Assert.*;

public class MappedBytesStoreTest extends BytesTestCommon implements ReferenceOwner {
public static final int PAGE_SIZE = OS.defaultOsPageSize();
private MappedFile mappedFile;
private MappedBytesStore mappedBytesStore;

@Before
public void setup() throws IOException {
String filePath = OS.getTarget() + "/test" + System.nanoTime() + ".deleteme";
mappedFile = MappedFile.mappedFile(filePath, PAGE_SIZE, PAGE_SIZE);
mappedBytesStore = mappedFile.acquireByteStore(this, 0);
new File(filePath).deleteOnExit();
}

@After
public void tearDown() {
if (mappedBytesStore != null)
mappedBytesStore.release(this);
mappedFile.close();
}

@Test
public void testWriteReadBytes() throws ClosedIllegalStateException {
byte value = 123;
long position = 5;
mappedBytesStore.writeByte(position, value);

byte readValue = mappedBytesStore.readByte(position);
assertEquals("Written and read values should be equal", value, readValue);
}

@Test(expected = IllegalStateException.class)
public void testWriteAfterClose() {
try {
mappedBytesStore.release(this);
mappedBytesStore.release(ReferenceOwner.INIT);
mappedBytesStore.writeByte(0, (byte) 1);
} finally {
mappedBytesStore = null;
}
}

@Test
public void testSafeLimit() {
assertTrue("Position within safe limit should be valid", mappedBytesStore.inside(0));
assertFalse("Position beyond safe limit should be invalid", mappedBytesStore.inside(mappedBytesStore.safeLimit()));
}

@Test
public void testCapacity() {
assertEquals("The capacities should match", PAGE_SIZE * 2, mappedBytesStore.capacity());
}

@Test
public void testLockRegion() throws IOException {
// Try to lock a region of the file
assertNotNull("Lock should be obtained", mappedBytesStore.tryLock(0, 10, true));
}

@Test
public void testByteBufferReadWrite() throws ClosedIllegalStateException {
byte[] writeBytes = new byte[10];
for (byte i = 0; i < 10; i++) {
writeBytes[i] = i;
}
mappedBytesStore.write(0, writeBytes, 0, writeBytes.length);

byte[] readBytes = new byte[10];
mappedBytesStore.read(0, readBytes, 0, 10);

assertArrayEquals("Buffer content should match", writeBytes, readBytes);
}

@Test
public void testSyncUpTo() throws IOException {
mappedBytesStore.syncUpTo(0);
mappedBytesStore.syncUpTo(1000);
mappedBytesStore.syncUpTo(5000);
mappedBytesStore.syncUpTo(1000000);

mappedBytesStore.release(this);
mappedBytesStore = mappedFile.acquireByteStore(this, OS.pageSize());

mappedBytesStore.syncUpTo(0);
mappedBytesStore.syncUpTo(1000);
mappedBytesStore.syncUpTo(5000);
mappedBytesStore.syncUpTo(1000000);
}
}