Skip to content

Commit

Permalink
ARROW-6141: [C++] Enable memory-mapping a file region
Browse files Browse the repository at this point in the history
This patch adds an Open() method for MemoryMappedFile() with
length and offset params. In this way user can memory map a file
region just like mmap(). The new API is:

* MemoryMappedFile::Open(path, mode, length, offset, &mmap)

The original API is still available. Calling the original API
will memory map the whole file:

* MemoryMappedFile::Open(path, mode, &mmap)

A new field map_len_ is added in MemoryMappedFile::MemoryMap to
track the real memory map length.

Also MemoryMappedFile::Read()/ReadAt()/Write()/WriteAt() are changed
to check the memory map length if it's a region based memory map.

Note the MemoryMappedFile::Resize() is not supported if it's a
region based memory map.

Signey-off-by: Yuan Zhou <yuan.zhou@intel.com>
  • Loading branch information
zhouyuan committed Aug 28, 2019
1 parent a40d6b6 commit 81f9246
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 13 deletions.
58 changes: 45 additions & 13 deletions cpp/src/arrow/io/file.cc
Expand Up @@ -395,7 +395,7 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer {
~MemoryMap() {
ARROW_CHECK_OK(Close());
if (mutable_data_ != nullptr) {
int result = munmap(mutable_data_, static_cast<size_t>(size_));
int result = munmap(mutable_data_, static_cast<size_t>(map_len_));
ARROW_CHECK_EQ(result, 0) << "munmap failed";
}
}
Expand All @@ -412,7 +412,8 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer {

bool closed() const { return !file_->is_open(); }

Status Open(const std::string& path, FileMode::type mode) {
Status Open(const std::string& path, FileMode::type mode, const int64_t length = 0,
const int64_t offset = 0) {
file_.reset(new OSFile());

if (mode != FileMode::READ) {
Expand All @@ -432,11 +433,12 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer {

is_mutable_ = false;
}
map_len_ = offset_ = 0;

// Memory mapping fails when file size is 0
// delay it until the first resize
if (file_->size() > 0) {
RETURN_NOT_OK(InitMMap(file_->size()));
RETURN_NOT_OK(InitMMap(file_->size(), false, length, offset));
}

position_ = 0;
Expand All @@ -445,10 +447,14 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer {
}

// Resize the mmap and file to the specified size.
// Resize on memory mapped file region is not supported.
Status Resize(const int64_t new_size) {
if (!writable()) {
return Status::IOError("Cannot resize a readonly memory map");
}
if (map_len_ != size_) {
return Status::IOError("Cannot resize a partial memory map");
}

if (new_size == 0) {
if (mutable_data_ != nullptr) {
Expand All @@ -458,7 +464,7 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer {
}
RETURN_NOT_OK(internal::FileTruncate(file_->fd(), 0));
data_ = mutable_data_ = nullptr;
size_ = capacity_ = 0;
map_len_ = offset_ = size_ = capacity_ = 0;
}
position_ = 0;
return Status::OK();
Expand All @@ -468,7 +474,8 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer {
void* result;
RETURN_NOT_OK(
internal::MemoryMapRemap(mutable_data_, size_, new_size, file_->fd(), &result));
size_ = capacity_ = new_size;
map_len_ = size_ = capacity_ = new_size;
offset_ = 0;
data_ = mutable_data_ = static_cast<uint8_t*>(result);
if (position_ > size_) {
position_ = size_;
Expand All @@ -484,6 +491,8 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer {

int64_t size() const { return size_; }

int64_t maplen() const { return map_len_; }

Status Seek(int64_t position) {
if (position < 0) {
return Status::Invalid("position is out of bounds");
Expand All @@ -510,16 +519,25 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer {

private:
// Initialize the mmap and set size, capacity and the data pointers
Status InitMMap(int64_t initial_size, bool resize_file = false) {
Status InitMMap(int64_t initial_size, bool resize_file = false,
const int64_t length = 0, const int64_t offset = 0) {
if (resize_file) {
RETURN_NOT_OK(internal::FileTruncate(file_->fd(), initial_size));
}
DCHECK(data_ == nullptr && mutable_data_ == nullptr);
void* result = mmap(nullptr, static_cast<size_t>(initial_size), prot_flags_,
map_mode_, file_->fd(), 0);

size_t mmap_length = static_cast<size_t>(initial_size);
if (length > 0 && length < initial_size) {
mmap_length = static_cast<size_t>(length);
}

void* result = mmap(nullptr, mmap_length, prot_flags_, map_mode_, file_->fd(),
static_cast<off_t>(offset));
if (result == MAP_FAILED) {
return Status::IOError("Memory mapping file failed: ", std::strerror(errno));
}
map_len_ = mmap_length;
offset_ = offset;
size_ = capacity_ = initial_size;
data_ = mutable_data_ = static_cast<uint8_t*>(result);

Expand All @@ -529,6 +547,8 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer {
int prot_flags_;
int map_mode_;
int64_t position_;
int64_t offset_;
int64_t map_len_;
std::mutex resize_lock_;
};

Expand Down Expand Up @@ -557,6 +577,18 @@ Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode,
return Status::OK();
}

Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode,
const int64_t length, const int64_t offset,
std::shared_ptr<MemoryMappedFile>* out) {
std::shared_ptr<MemoryMappedFile> result(new MemoryMappedFile());

result->memory_map_.reset(new MemoryMap());
RETURN_NOT_OK(result->memory_map_->Open(path, mode, length, offset));

*out = result;
return Status::OK();
}

Status MemoryMappedFile::GetSize(int64_t* size) const {
*size = memory_map_->size();
return Status::OK();
Expand Down Expand Up @@ -585,7 +617,7 @@ Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes,
auto guard_resize = memory_map_->writable()
? std::unique_lock<std::mutex>(memory_map_->resize_lock())
: std::unique_lock<std::mutex>();
nbytes = std::max<int64_t>(0, std::min(nbytes, memory_map_->size() - position));
nbytes = std::max<int64_t>(0, std::min(nbytes, memory_map_->maplen() - position));

if (nbytes > 0) {
*out = SliceBuffer(memory_map_, position, nbytes);
Expand All @@ -600,7 +632,7 @@ Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes
auto guard_resize = memory_map_->writable()
? std::unique_lock<std::mutex>(memory_map_->resize_lock())
: std::unique_lock<std::mutex>();
nbytes = std::max<int64_t>(0, std::min(nbytes, memory_map_->size() - position));
nbytes = std::max<int64_t>(0, std::min(nbytes, memory_map_->maplen() - position));
if (nbytes > 0) {
memcpy(out, memory_map_->data() + position, static_cast<size_t>(nbytes));
}
Expand Down Expand Up @@ -628,12 +660,12 @@ Status MemoryMappedFile::WriteAt(int64_t position, const void* data, int64_t nby
if (!memory_map_->opened() || !memory_map_->writable()) {
return Status::IOError("Unable to write");
}
if (position + nbytes > memory_map_->size()) {
if (position + nbytes > memory_map_->maplen()) {
return Status::Invalid("Cannot write past end of memory map");
}

RETURN_NOT_OK(memory_map_->Seek(position));
if (nbytes + memory_map_->position() > memory_map_->size()) {
if (nbytes + memory_map_->position() > memory_map_->maplen()) {
return Status::Invalid("Cannot write past end of memory map");
}

Expand All @@ -646,7 +678,7 @@ Status MemoryMappedFile::Write(const void* data, int64_t nbytes) {
if (!memory_map_->opened() || !memory_map_->writable()) {
return Status::IOError("Unable to write");
}
if (nbytes + memory_map_->position() > memory_map_->size()) {
if (nbytes + memory_map_->position() > memory_map_->maplen()) {
return Status::Invalid("Cannot write past end of memory map");
}

Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/io/file.h
Expand Up @@ -185,9 +185,14 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
static Status Create(const std::string& path, int64_t size,
std::shared_ptr<MemoryMappedFile>* out);

// mmap() with whole file
static Status Open(const std::string& path, FileMode::type mode,
std::shared_ptr<MemoryMappedFile>* out);

// mmap() with a region of file, the offset must be a multiple of the page size
static Status Open(const std::string& path, FileMode::type mode, const int64_t length,
const int64_t offset, std::shared_ptr<MemoryMappedFile>* out);

Status Close() override;

bool closed() const override;
Expand Down
35 changes: 35 additions & 0 deletions cpp/src/arrow/io/file_test.cc
Expand Up @@ -635,6 +635,41 @@ TEST_F(TestMemoryMappedFile, ZeroSizeFlie) {
ASSERT_EQ(0, size);
}

TEST_F(TestMemoryMappedFile, MapPartFile) {
const int64_t buffer_size = 1024;
const int64_t unalign_offset = 1024;
const int64_t offset = 4096;
std::vector<uint8_t> buffer(buffer_size);

random_bytes(1024, 0, buffer.data());

const int reps = 8;

std::string path = "io-memory-map-offset";
std::shared_ptr<MemoryMappedFile> result;

// file size = 8192
CreateFile(path, reps * buffer_size);

// map failed with unaligned offset
ASSERT_RAISES(IOError, MemoryMappedFile::Open(path, FileMode::READWRITE, 4096,
unalign_offset, &result));

// map file region <4096-8192>
ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, 4096, offset, &result));

std::shared_ptr<Buffer> out_buffer;
ASSERT_OK(result->Write(buffer.data(), buffer_size));
ASSERT_OK(result->ReadAt(0, buffer_size, &out_buffer));
ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));

// Resize is not supported
ASSERT_RAISES(IOError, result->Resize(4096));

// Write beyond memory mapped length
ASSERT_RAISES(Invalid, result->WriteAt(4096, buffer.data(), buffer_size));
}

TEST_F(TestMemoryMappedFile, WriteRead) {
const int64_t buffer_size = 1024;
std::vector<uint8_t> buffer(buffer_size);
Expand Down

0 comments on commit 81f9246

Please sign in to comment.