Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ci/travis_script_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ if [ $ARROW_TRAVIS_VALGRIND == "1" ]; then
export PLASMA_VALGRIND=1
fi

# Set up huge pages for plasma test
if [ $TRAVIS_OS_NAME == "linux" ]; then
sudo mkdir -p /mnt/hugepages
sudo mount -t hugetlbfs -o uid=`id -u` -o gid=`id -g` none /mnt/hugepages
sudo bash -c "echo `id -g` > /proc/sys/vm/hugetlb_shm_group"
sudo bash -c "echo 20000 > /proc/sys/vm/nr_hugepages"
fi

PYARROW_PATH=$CONDA_PREFIX/lib/python$PYTHON_VERSION/site-packages/pyarrow
python -m pytest -vv -r sxX --durations=15 -s $PYARROW_PATH --parquet

Expand Down
11 changes: 8 additions & 3 deletions cpp/src/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "plasma/common.h"
#include "plasma/fling.h"
#include "plasma/io.h"
#include "plasma/malloc.h"
#include "plasma/plasma.h"
#include "plasma/protocol.h"

Expand Down Expand Up @@ -117,8 +118,10 @@ uint8_t* PlasmaClient::lookup_or_mmap(int fd, int store_fd_val, int64_t map_size
close(fd);
return entry->second.pointer;
} else {
uint8_t* result = reinterpret_cast<uint8_t*>(
mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0));
// We subtract kMmapRegionsGap from the length that was added
// in fake_mmap in malloc.h, to make map_size page-aligned again.
uint8_t* result = reinterpret_cast<uint8_t*>(mmap(
NULL, map_size - kMmapRegionsGap, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0));
// TODO(pcm): Don't fail here, instead return a Status.
if (result == MAP_FAILED) {
ARROW_LOG(FATAL) << "mmap failed";
Expand Down Expand Up @@ -395,7 +398,9 @@ Status PlasmaClient::UnmapObject(const ObjectID& object_id) {
ARROW_CHECK(entry->second.count >= 1);
if (entry->second.count == 1) {
// If no other objects are being used, then unmap the file.
int err = munmap(entry->second.pointer, entry->second.length);
// We subtract kMmapRegionsGap from the length that was added
// in fake_mmap in malloc.h, to make the size page-aligned again.
int err = munmap(entry->second.pointer, entry->second.length - kMmapRegionsGap);
if (err == -1) {
return Status::IOError("Error during munmap");
}
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/plasma/malloc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ int create_buffer(int64_t size) {
}

void* fake_mmap(size_t size) {
// Add sizeof(size_t) so that the returned pointer is deliberately not
// Add kMmapRegionsGap so that the returned pointer is deliberately not
// page-aligned. This ensures that the segments of memory returned by
// fake_mmap are never contiguous.
size += sizeof(size_t);
size += kMmapRegionsGap;

int fd = create_buffer(size);
ARROW_CHECK(fd >= 0) << "Failed to create buffer during mmap";
Expand All @@ -155,15 +155,15 @@ void* fake_mmap(size_t size) {
record.size = size;

// We lie to dlmalloc about where mapped memory actually lives.
pointer = pointer_advance(pointer, sizeof(size_t));
pointer = pointer_advance(pointer, kMmapRegionsGap);
ARROW_LOG(DEBUG) << pointer << " = fake_mmap(" << size << ")";
return pointer;
}

int fake_munmap(void* addr, int64_t size) {
ARROW_LOG(DEBUG) << "fake_munmap(" << addr << ", " << size << ")";
addr = pointer_retreat(addr, sizeof(size_t));
size += sizeof(size_t);
addr = pointer_retreat(addr, kMmapRegionsGap);
size += kMmapRegionsGap;

auto entry = mmap_records.find(addr);

Expand Down
6 changes: 6 additions & 0 deletions cpp/src/plasma/malloc.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@
#include <inttypes.h>
#include <stddef.h>

/// Gap between two consecutive mmap regions allocated by fake_mmap.
/// This ensures that the segments of memory returned by
/// fake_mmap are never contiguous and dlmalloc does not coalesce it
/// (in the client we cannot guarantee that these mmaps are contiguous).
constexpr int64_t kMmapRegionsGap = sizeof(size_t);

void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_length, ptrdiff_t* offset);

/// Get the mmap size corresponding to a specific file descriptor.
Expand Down
18 changes: 17 additions & 1 deletion python/pyarrow/tests/test_plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ def assert_get_object_equal(unit_test, client1, client2, object_id,
def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
use_valgrind=False, use_profiler=False,
stdout_file=None, stderr_file=None,
use_one_memory_mapped_file=False):
use_one_memory_mapped_file=False,
plasma_directory=None, use_hugepages=False):
"""Start a plasma store process.
Args:
use_valgrind (bool): True if the plasma store should be started inside
Expand All @@ -131,6 +132,10 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
"-m", str(plasma_store_memory)]
if use_one_memory_mapped_file:
command += ["-f"]
if plasma_directory:
command += ["-d", plasma_directory]
if use_hugepages:
command += ["-h"]
if use_valgrind:
pid = subprocess.Popen(["valgrind",
"--track-origins=yes",
Expand Down Expand Up @@ -762,3 +767,14 @@ def test_object_id_size():
with pytest.raises(ValueError):
plasma.ObjectID("hello")
plasma.ObjectID(20 * b"0")


@pytest.mark.skipif(not os.path.exists("/mnt/hugepages"),
reason="requires hugepage support")
def test_use_huge_pages():
import pyarrow.plasma as plasma
plasma_store_name, p = start_plasma_store(
plasma_directory="/mnt/hugepages", use_hugepages=True)
plasma_client = plasma.connect(plasma_store_name, "", 64)
create_object(plasma_client, 100000000)
p.kill()