Skip to content

Commit

Permalink
HCOLL-397 msync before pointing to newly created tier bulk in the header
Browse files Browse the repository at this point in the history
  • Loading branch information
leventov committed Jan 31, 2016
1 parent 8891626 commit 299cd83
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 13 deletions.
12 changes: 12 additions & 0 deletions pom.xml
Expand Up @@ -101,6 +101,18 @@
<version>1.1.6</version>
</dependency>

<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>4.2.1</version>
</dependency>

<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna-platform</artifactId>
<version>4.2.1</version>
</dependency>

<!-- optional dependencies -->

<dependency>
Expand Down
42 changes: 42 additions & 0 deletions src/main/java/net/openhft/chronicle/hash/impl/PosixMsync.java
@@ -0,0 +1,42 @@
/*
* Copyright (C) 2015 higherfrequencytrading.com
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package net.openhft.chronicle.hash.impl;

import com.sun.jna.Native;
import com.sun.jna.Platform;

import java.io.IOException;

final class PosixMsync {

private static final int MS_SYNC = 4;

public static void msync(long addr, long length) throws IOException {
if (msync(addr, length, MS_SYNC) == -1)
throw new IOException(strerror_r(Native.getLastError(), null, 0));
}

private static native String strerror_r(int errnum, char[] buf, int buflen);

private static native int msync(long addr, long length, int flags);

static {
Native.register(PosixMsync.class, Platform.C_LIBRARY_NAME);
}

private PosixMsync() {}
}
Expand Up @@ -731,10 +731,24 @@ public long allocateTier(int forSegmentIndex, int tier) {
}

private void allocateTierBulk() {
try {
RandomAccessFile raf = persisted() ? new RandomAccessFile(file, "rw") : null;
try {
allocateTierBulk0(raf);
} finally {
if (raf != null)
raf.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private void allocateTierBulk0(RandomAccessFile raf) throws IOException {
int allocatedExtraTierBulks = globalMutableState.getAllocatedExtraTierBulks();
mapTierBulks(allocatedExtraTierBulks);

// integer overflow aware
mapTierBulks(raf, allocatedExtraTierBulks);

long firstTierIndex = extraTierIndexToTierIndex(allocatedExtraTierBulks * tiersInBulk);
BytesStore tierBytesStore = tierBytesStore(firstTierIndex);
long firstTierOffset = tierBytesOffset(firstTierIndex);
Expand All @@ -746,13 +760,30 @@ private void allocateTierBulk() {
long lastTierIndex = firstTierIndex + tiersInBulk - 1;
linkAndZeroOutFreeTiers(firstTierIndex, lastTierIndex);

// TODO HCOLL-397 insert msync here!
if (persisted()) {
long address = tierBytesStore.address(firstTierOffset - tierBulkInnerOffsetToTiers);
// address should be a multiple of page size
if (OS.pageAlign(address) != address) {
address = OS.pageAlign(address) - OS.pageSize();
}
long endAddress = tierBytesStore.address(tierBytesOffset(lastTierIndex)) + tierSize;
long length = endAddress - address;
msync(raf, address, length);
}

// after we are sure the new bulk is initialized, update the global mutable state
globalMutableState.setAllocatedExtraTierBulks(allocatedExtraTierBulks + 1);
globalMutableState.setFirstFreeTierIndex(firstTierIndex);
}

private static void msync(RandomAccessFile raf, long address, long length) throws IOException {
if (OS.isWindows()) {
WindowsMsync.msync(raf, address, length);
} else {
PosixMsync.msync(address, length);
}
}

public void linkAndZeroOutFreeTiers(long firstTierIndex, long lastTierIndex) {
for (long tierIndex = firstTierIndex; tierIndex <= lastTierIndex; tierIndex++) {
long tierOffset = tierBytesOffset(tierIndex);
Expand Down Expand Up @@ -822,9 +853,23 @@ protected long tierAddr(TierBulkData tierBulkData, long tierIndexOffsetWithinBul
}

private void mapTierBulks(int upToBulkIndex) {
try {
RandomAccessFile raf = persisted() ? new RandomAccessFile(file, "rw") : null;
try {
mapTierBulks(raf, upToBulkIndex);
} finally {
if (raf != null)
raf.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private void mapTierBulks(RandomAccessFile raf, int upToBulkIndex) {
if (persisted()) {
try {
mapTierBulksMapped(upToBulkIndex);
mapTierBulksMapped(raf, upToBulkIndex);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -834,7 +879,7 @@ private void mapTierBulks(int upToBulkIndex) {
}
}

private void mapTierBulksMapped(int upToBulkIndex) throws IOException {
private void mapTierBulksMapped(RandomAccessFile raf, int upToBulkIndex) throws IOException {
int firstBulkToMapIndex = tierBulkOffsets.size();
int bulksToMap = upToBulkIndex + 1 - firstBulkToMapIndex;
long mapSize = bulksToMap * tierBulkSizeInBytes;
Expand All @@ -852,14 +897,11 @@ private void mapTierBulksMapped(int upToBulkIndex) throws IOException {
// Now might need to have bigger mapSize
mapSize += firstBulkToMapOffsetWithinMapping;
}
// TODO optimize -- create a file channel not on each tier bulk creation
try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) {
// mapping by hand, because MappedFile/MappedBytesStore doesn't allow to create a BS
// which starts not from the beginning of the file, but has start() of 0
NativeBytesStore extraStore = map(raf, mapSize, mappingOffsetInFile);
appendBulkData(firstBulkToMapIndex, upToBulkIndex, extraStore,
firstBulkToMapOffsetWithinMapping);
}
// mapping by hand, because MappedFile/MappedBytesStore doesn't allow to create a BS
// which starts not from the beginning of the file, but has start() of 0
NativeBytesStore extraStore = map(raf, mapSize, mappingOffsetInFile);
appendBulkData(firstBulkToMapIndex, upToBulkIndex, extraStore,
firstBulkToMapOffsetWithinMapping);
}

/**
Expand Down
61 changes: 61 additions & 0 deletions src/main/java/net/openhft/chronicle/hash/impl/WindowsMsync.java
@@ -0,0 +1,61 @@
/*
* Copyright (C) 2015 higherfrequencytrading.com
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package net.openhft.chronicle.hash.impl;

import com.sun.jna.Native;
import com.sun.jna.platform.win32.Kernel32Util;

import java.io.IOException;
import java.io.RandomAccessFile;

import static com.sun.jna.platform.win32.WinError.ERROR_LOCK_VIOLATION;

final class WindowsMsync {

public static void msync(RandomAccessFile raf, long addr, long length)
throws IOException {
int retry = 0;
boolean success;
int lastError = 0;
// FlushViewOfFile can fail with ERROR_LOCK_VIOLATION if the memory system is writing dirty
// pages to disk. As there is no way to synchronize the flushing then we retry a limited
// number of times.
do {
if ((success = FlushViewOfFile(addr, length)) ||
(lastError = GetLastError()) != ERROR_LOCK_VIOLATION)
break;
retry++;
} while (retry < 3);

if (success) {
// Finally calls FlushFileBuffers
raf.getChannel().force(false);
} else {
throw new IOException(Kernel32Util.formatMessage(lastError));
}
}

private static native boolean FlushViewOfFile(long addr, long length);

private static native int GetLastError();

static {
Native.register(WindowsMsync.class, "kernel32");
}

private WindowsMsync() {}
}

0 comments on commit 299cd83

Please sign in to comment.