Skip to content

Commit

Permalink
Move memory locking into NativeAccess (#108829)
Browse files Browse the repository at this point in the history
This commit moves the implementations of locking virtual memory into RAM
into NativeAccess.

relates #104876
  • Loading branch information
rjernst committed May 21, 2024
1 parent 04f1fce commit 062039b
Show file tree
Hide file tree
Showing 27 changed files with 687 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

<!-- JNA requires the no-argument constructor on JNAKernel32Library.SizeT to be public-->
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]bootstrap[/\\]JNAKernel32Library.java" checks="RedundantModifier" />
<!-- JNA requires the no-argument constructor on JnaKernel32Library.SizeT to be public -->
<suppress files="libs[/\\]native[/\\]jna[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]nativeaccess[/\\]jna[/\\]JnaKernel32Library.java" checks="RedundantModifier" />

<!-- the constructors on some local classes in these tests must be public-->
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]plugins[/\\]PluginsServiceTests.java" checks="RedundantModifier" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.nativeaccess.jna;

import com.sun.jna.IntegerType;
import com.sun.jna.Native;
import com.sun.jna.NativeLong;
import com.sun.jna.Pointer;
import com.sun.jna.Structure;
import com.sun.jna.win32.StdCallLibrary;

import org.elasticsearch.nativeaccess.lib.Kernel32Library;

import java.util.List;

class JnaKernel32Library implements Kernel32Library {
private static class JnaHandle implements Handle {
final Pointer pointer;

JnaHandle(Pointer pointer) {
this.pointer = pointer;
}
}

static class JnaAddress implements Address {
final Pointer pointer;

JnaAddress(Pointer pointer) {
this.pointer = pointer;
}

@Override
public Address add(long offset) {
return new JnaAddress(new Pointer(Pointer.nativeValue(pointer) + offset));
}
}

public static class SizeT extends IntegerType {
// JNA requires this no-arg constructor to be public,
// otherwise it fails to register kernel32 library
public SizeT() {
this(0);
}

public SizeT(long value) {
super(Native.SIZE_T_SIZE, value);
}
}

/**
* @see MemoryBasicInformation
*/
public static class JnaMemoryBasicInformation extends Structure implements MemoryBasicInformation {
// note: these members must be public for jna to set them
public Pointer BaseAddress = new Pointer(0);
public byte[] _ignore = new byte[16];
public SizeT RegionSize = new SizeT();
public NativeLong State;
public NativeLong Protect;
public NativeLong Type;

@Override
protected List<String> getFieldOrder() {
return List.of("BaseAddress", "_ignore", "RegionSize", "State", "Protect", "Type");
}

@Override
public Address BaseAddress() {
return new JnaAddress(BaseAddress);
}

@Override
public long RegionSize() {
return RegionSize.longValue();
}

@Override
public long State() {
return State.longValue();
}

@Override
public long Protect() {
return Protect.longValue();
}

@Override
public long Type() {
return Type.longValue();
}
}

private interface NativeFunctions extends StdCallLibrary {
Pointer GetCurrentProcess();

boolean CloseHandle(Pointer handle);

boolean VirtualLock(Pointer address, SizeT size);

int VirtualQueryEx(Pointer handle, Pointer address, JnaMemoryBasicInformation memoryInfo, int length);

boolean SetProcessWorkingSetSize(Pointer handle, SizeT minSize, SizeT maxSize);
}

private final NativeFunctions functions;

JnaKernel32Library() {
this.functions = Native.load("kernel32", NativeFunctions.class);
}

@Override
public Handle GetCurrentProcess() {
return new JnaHandle(functions.GetCurrentProcess());
}

@Override
public boolean CloseHandle(Handle handle) {
assert handle instanceof JnaHandle;
var jnaHandle = (JnaHandle) handle;
return functions.CloseHandle(jnaHandle.pointer);
}

@Override
public int GetLastError() {
// JNA does not like linking direclty to GetLastError, so we must use the Native helper function
return Native.getLastError();
}

@Override
public MemoryBasicInformation newMemoryBasicInformation() {
return new JnaMemoryBasicInformation();
}

@Override
public boolean VirtualLock(Address address, long size) {
assert address instanceof JnaAddress;
var jnaAddress = (JnaAddress) address;
return functions.VirtualLock(jnaAddress.pointer, new SizeT(size));
}

@Override
public int VirtualQueryEx(Handle handle, Address address, MemoryBasicInformation memoryInfo) {
assert handle instanceof JnaHandle;
assert address instanceof JnaAddress;
assert memoryInfo instanceof JnaMemoryBasicInformation;
var jnaHandle = (JnaHandle) handle;
var jnaAddress = (JnaAddress) address;
var jnaMemoryInfo = (JnaMemoryBasicInformation) memoryInfo;
return functions.VirtualQueryEx(jnaHandle.pointer, jnaAddress.pointer, jnaMemoryInfo, jnaMemoryInfo.size());
}

@Override
public boolean SetProcessWorkingSetSize(Handle handle, long minSize, long maxSize) {
assert handle instanceof JnaHandle;
var jnaHandle = (JnaHandle) handle;
return functions.SetProcessWorkingSetSize(jnaHandle.pointer, new SizeT(minSize), new SizeT(maxSize));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.nativeaccess.jna;

import org.elasticsearch.nativeaccess.lib.JavaLibrary;
import org.elasticsearch.nativeaccess.lib.Kernel32Library;
import org.elasticsearch.nativeaccess.lib.NativeLibrary;
import org.elasticsearch.nativeaccess.lib.NativeLibraryProvider;
import org.elasticsearch.nativeaccess.lib.PosixCLibrary;
Expand All @@ -29,6 +30,8 @@ public JnaNativeLibraryProvider() {
JnaJavaLibrary::new,
PosixCLibrary.class,
JnaPosixCLibrary::new,
Kernel32Library.class,
JnaKernel32Library::new,
SystemdLibrary.class,
JnaSystemdLibrary::new,
ZstdLibrary.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ private interface NativeFunctions extends Library {

int getrlimit(int resource, JnaRLimit rlimit);

int mlockall(int flags);

String strerror(int errno);
}

Expand All @@ -72,6 +74,11 @@ public int getrlimit(int resource, RLimit rlimit) {
return functions.getrlimit(resource, jnaRlimit);
}

@Override
public int mlockall(int flags) {
return functions.mlockall(flags);
}

@Override
public String strerror(int errno) {
return functions.strerror(errno);
Expand Down
1 change: 1 addition & 0 deletions libs/native/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
module org.elasticsearch.nativeaccess {
requires org.elasticsearch.base;
requires org.elasticsearch.logging;
requires java.management; // for access to heap size

exports org.elasticsearch.nativeaccess
to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ abstract class AbstractNativeAccess implements NativeAccess {
private final String name;
private final JavaLibrary javaLib;
private final Zstd zstd;
protected boolean isMemoryLocked = false;

protected AbstractNativeAccess(String name, NativeLibraryProvider libraryProvider) {
this.name = name;
Expand All @@ -47,4 +48,9 @@ public CloseableByteBuffer newBuffer(int len) {
assert len > 0;
return javaLib.newBuffer(len);
}

@Override
public boolean isMemoryLocked() {
return isMemoryLocked;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class LinuxNativeAccess extends PosixNativeAccess {
Systemd systemd;

LinuxNativeAccess(NativeLibraryProvider libraryProvider) {
super("Linux", libraryProvider, new PosixConstants(-1L, 9, 1));
super("Linux", libraryProvider, new PosixConstants(-1L, 9, 1, 8));
this.systemd = new Systemd(libraryProvider.getLibrary(SystemdLibrary.class));
}

Expand All @@ -34,4 +34,16 @@ protected long getMaxThreads() {
public Systemd systemd() {
return systemd;
}

@Override
protected void logMemoryLimitInstructions() {
// give specific instructions for the linux case to make it easy
String user = System.getProperty("user.name");
logger.warn("""
These can be adjusted by modifying /etc/security/limits.conf, for example:
\t# allow user '{}' mlockall
\t{} soft memlock unlimited
\t{} hard memlock unlimited""", user, user, user);
logger.warn("If you are logged in interactively, you will have to re-login for the new limits to take effect.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@
class MacNativeAccess extends PosixNativeAccess {

MacNativeAccess(NativeLibraryProvider libraryProvider) {
super("MacOS", libraryProvider, new PosixConstants(9223372036854775807L, 5, 1));
super("MacOS", libraryProvider, new PosixConstants(9223372036854775807L, 5, 1, 6));
}

@Override
protected long getMaxThreads() {
return ProcessLimits.UNKNOWN;
}

@Override
protected void logMemoryLimitInstructions() {
// we don't have instructions for macos
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ static NativeAccess instance() {
*/
ProcessLimits getProcessLimits();

/**
* Attempt to lock this process's virtual memory address space into physical RAM.
*/
void tryLockMemory();

/**
* Return whether locking memory was successful, or false otherwise.
*/
boolean isMemoryLocked();

Systemd systemd();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ public ProcessLimits getProcessLimits() {
return new ProcessLimits(ProcessLimits.UNKNOWN, ProcessLimits.UNKNOWN, ProcessLimits.UNKNOWN);
}

@Override
public void tryLockMemory() {
logger.warn("Cannot lock memory because native access is not available");
}

@Override
public boolean isMemoryLocked() {
return false;
}

@Override
public Systemd systemd() {
logger.warn("Cannot get systemd access because native access is not available");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@
/**
* Code constants on POSIX systems.
*/
record PosixConstants(long RLIMIT_INFINITY, int RLIMIT_AS, int RLIMIT_FSIZE) {}
record PosixConstants(long RLIMIT_INFINITY, int RLIMIT_AS, int RLIMIT_FSIZE, int RLIMIT_MEMLOCK) {}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

abstract class PosixNativeAccess extends AbstractNativeAccess {

public static final int MCL_CURRENT = 1;
public static final int ENOMEM = 12;

protected final PosixCLibrary libc;
protected final VectorSimilarityFunctions vectorDistance;
protected final PosixConstants constants;
Expand Down Expand Up @@ -73,11 +76,64 @@ public ProcessLimits getProcessLimits() {
return processLimits;
}

@Override
public void tryLockMemory() {
int result = libc.mlockall(MCL_CURRENT);
if (result == 0) {
isMemoryLocked = true;
return;
}

// mlockall failed for some reason
int errno = libc.errno();
String errMsg = libc.strerror(errno);
logger.warn("Unable to lock JVM Memory: error={}, reason={}", errno, errMsg);
logger.warn("This can result in part of the JVM being swapped out.");

if (errno == ENOMEM) {

boolean rlimitSuccess = false;
long softLimit = 0;
long hardLimit = 0;

// we only know RLIMIT_MEMLOCK for these two at the moment.
var rlimit = libc.newRLimit();
if (libc.getrlimit(constants.RLIMIT_MEMLOCK(), rlimit) == 0) {
rlimitSuccess = true;
softLimit = rlimit.rlim_cur();
hardLimit = rlimit.rlim_max();
} else {
logger.warn("Unable to retrieve resource limits: {}", libc.strerror(libc.errno()));
}

if (rlimitSuccess) {
logger.warn(
"Increase RLIMIT_MEMLOCK, soft limit: {}, hard limit: {}",
rlimitToString(softLimit),
rlimitToString(hardLimit)
);
logMemoryLimitInstructions();
} else {
logger.warn("Increase RLIMIT_MEMLOCK (ulimit).");
}
}
}

protected abstract void logMemoryLimitInstructions();

@Override
public Optional<VectorSimilarityFunctions> getVectorSimilarityFunctions() {
return Optional.ofNullable(vectorDistance);
}

String rlimitToString(long value) {
if (value == constants.RLIMIT_INFINITY()) {
return "unlimited";
} else {
return Long.toUnsignedString(value);
}
}

static boolean isNativeVectorLibSupported() {
return Runtime.version().feature() >= 21 && (isMacOrLinuxAarch64() || isLinuxAmd64()) && checkEnableSystemProperty();
}
Expand Down
Loading

0 comments on commit 062039b

Please sign in to comment.