diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc index 5abf0604ee492..5b9de671a453a 100644 --- a/cpp/src/arrow/io/file.cc +++ b/cpp/src/arrow/io/file.cc @@ -395,7 +395,7 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer { ~MemoryMap() { ARROW_CHECK_OK(Close()); if (mutable_data_ != nullptr) { - int result = munmap(mutable_data_, static_cast(size_)); + int result = munmap(mutable_data_, static_cast(map_len_)); ARROW_CHECK_EQ(result, 0) << "munmap failed"; } } @@ -412,7 +412,8 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer { bool closed() const { return !file_->is_open(); } - Status Open(const std::string& path, FileMode::type mode) { + Status Open(const std::string& path, FileMode::type mode, const int64_t length = 0, + const int64_t offset = 0) { file_.reset(new OSFile()); if (mode != FileMode::READ) { @@ -432,11 +433,12 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer { is_mutable_ = false; } + map_len_ = offset_ = 0; // Memory mapping fails when file size is 0 // delay it until the first resize if (file_->size() > 0) { - RETURN_NOT_OK(InitMMap(file_->size())); + RETURN_NOT_OK(InitMMap(file_->size(), false, length, offset)); } position_ = 0; @@ -445,10 +447,14 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer { } // Resize the mmap and file to the specified size. + // Resize on memory mapped file region is not supported. Status Resize(const int64_t new_size) { if (!writable()) { return Status::IOError("Cannot resize a readonly memory map"); } + if (map_len_ != size_) { + return Status::IOError("Cannot resize a partial memory map"); + } if (new_size == 0) { if (mutable_data_ != nullptr) { @@ -458,7 +464,7 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer { } RETURN_NOT_OK(internal::FileTruncate(file_->fd(), 0)); data_ = mutable_data_ = nullptr; - size_ = capacity_ = 0; + map_len_ = offset_ = size_ = capacity_ = 0; } position_ = 0; return Status::OK(); @@ -468,7 +474,8 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer { void* result; RETURN_NOT_OK( internal::MemoryMapRemap(mutable_data_, size_, new_size, file_->fd(), &result)); - size_ = capacity_ = new_size; + map_len_ = size_ = capacity_ = new_size; + offset_ = 0; data_ = mutable_data_ = static_cast(result); if (position_ > size_) { position_ = size_; @@ -484,6 +491,8 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer { int64_t size() const { return size_; } + int64_t maplen() const { return map_len_; } + Status Seek(int64_t position) { if (position < 0) { return Status::Invalid("position is out of bounds"); @@ -510,16 +519,25 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer { private: // Initialize the mmap and set size, capacity and the data pointers - Status InitMMap(int64_t initial_size, bool resize_file = false) { + Status InitMMap(int64_t initial_size, bool resize_file = false, + const int64_t length = 0, const int64_t offset = 0) { if (resize_file) { RETURN_NOT_OK(internal::FileTruncate(file_->fd(), initial_size)); } DCHECK(data_ == nullptr && mutable_data_ == nullptr); - void* result = mmap(nullptr, static_cast(initial_size), prot_flags_, - map_mode_, file_->fd(), 0); + + size_t mmap_length = static_cast(initial_size); + if (length > 0 && length < initial_size) { + mmap_length = static_cast(length); + } + + void* result = mmap(nullptr, mmap_length, prot_flags_, map_mode_, file_->fd(), + static_cast(offset)); if (result == MAP_FAILED) { return Status::IOError("Memory mapping file failed: ", std::strerror(errno)); } + map_len_ = mmap_length; + offset_ = offset; size_ = capacity_ = initial_size; data_ = mutable_data_ = static_cast(result); @@ -529,6 +547,8 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer { int prot_flags_; int map_mode_; int64_t position_; + int64_t offset_; + int64_t map_len_; std::mutex resize_lock_; }; @@ -557,6 +577,18 @@ Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode, return Status::OK(); } +Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode, + const int64_t length, const int64_t offset, + std::shared_ptr* out) { + std::shared_ptr result(new MemoryMappedFile()); + + result->memory_map_.reset(new MemoryMap()); + RETURN_NOT_OK(result->memory_map_->Open(path, mode, length, offset)); + + *out = result; + return Status::OK(); +} + Status MemoryMappedFile::GetSize(int64_t* size) const { *size = memory_map_->size(); return Status::OK(); @@ -585,7 +617,7 @@ Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, auto guard_resize = memory_map_->writable() ? std::unique_lock(memory_map_->resize_lock()) : std::unique_lock(); - nbytes = std::max(0, std::min(nbytes, memory_map_->size() - position)); + nbytes = std::max(0, std::min(nbytes, memory_map_->maplen() - position)); if (nbytes > 0) { *out = SliceBuffer(memory_map_, position, nbytes); @@ -600,7 +632,7 @@ Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes auto guard_resize = memory_map_->writable() ? std::unique_lock(memory_map_->resize_lock()) : std::unique_lock(); - nbytes = std::max(0, std::min(nbytes, memory_map_->size() - position)); + nbytes = std::max(0, std::min(nbytes, memory_map_->maplen() - position)); if (nbytes > 0) { memcpy(out, memory_map_->data() + position, static_cast(nbytes)); } @@ -628,12 +660,12 @@ Status MemoryMappedFile::WriteAt(int64_t position, const void* data, int64_t nby if (!memory_map_->opened() || !memory_map_->writable()) { return Status::IOError("Unable to write"); } - if (position + nbytes > memory_map_->size()) { + if (position + nbytes > memory_map_->maplen()) { return Status::Invalid("Cannot write past end of memory map"); } RETURN_NOT_OK(memory_map_->Seek(position)); - if (nbytes + memory_map_->position() > memory_map_->size()) { + if (nbytes + memory_map_->position() > memory_map_->maplen()) { return Status::Invalid("Cannot write past end of memory map"); } @@ -646,7 +678,7 @@ Status MemoryMappedFile::Write(const void* data, int64_t nbytes) { if (!memory_map_->opened() || !memory_map_->writable()) { return Status::IOError("Unable to write"); } - if (nbytes + memory_map_->position() > memory_map_->size()) { + if (nbytes + memory_map_->position() > memory_map_->maplen()) { return Status::Invalid("Cannot write past end of memory map"); } diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h index e9ac13f4c6a39..694f0b96d991c 100644 --- a/cpp/src/arrow/io/file.h +++ b/cpp/src/arrow/io/file.h @@ -185,9 +185,14 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface { static Status Create(const std::string& path, int64_t size, std::shared_ptr* out); + // mmap() with whole file static Status Open(const std::string& path, FileMode::type mode, std::shared_ptr* out); + // mmap() with a region of file, the offset must be a multiple of the page size + static Status Open(const std::string& path, FileMode::type mode, const int64_t length, + const int64_t offset, std::shared_ptr* out); + Status Close() override; bool closed() const override; diff --git a/cpp/src/arrow/io/file_test.cc b/cpp/src/arrow/io/file_test.cc index c73bbe4de784a..c8d48d58b9871 100644 --- a/cpp/src/arrow/io/file_test.cc +++ b/cpp/src/arrow/io/file_test.cc @@ -635,6 +635,41 @@ TEST_F(TestMemoryMappedFile, ZeroSizeFlie) { ASSERT_EQ(0, size); } +TEST_F(TestMemoryMappedFile, MapPartFile) { + const int64_t buffer_size = 1024; + const int64_t unalign_offset = 1024; + const int64_t offset = 4096; + std::vector buffer(buffer_size); + + random_bytes(1024, 0, buffer.data()); + + const int reps = 8; + + std::string path = "io-memory-map-offset"; + std::shared_ptr result; + + // file size = 8192 + CreateFile(path, reps * buffer_size); + + // map failed with unaligned offset + ASSERT_RAISES(IOError, MemoryMappedFile::Open(path, FileMode::READWRITE, 4096, + unalign_offset, &result)); + + // map file region <4096-8192> + ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, 4096, offset, &result)); + + std::shared_ptr out_buffer; + ASSERT_OK(result->Write(buffer.data(), buffer_size)); + ASSERT_OK(result->ReadAt(0, buffer_size, &out_buffer)); + ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size)); + + // Resize is not supported + ASSERT_RAISES(IOError, result->Resize(4096)); + + // Write beyond memory mapped length + ASSERT_RAISES(Invalid, result->WriteAt(4096, buffer.data(), buffer_size)); +} + TEST_F(TestMemoryMappedFile, WriteRead) { const int64_t buffer_size = 1024; std::vector buffer(buffer_size);