Skip to content

Commit

Permalink
ARROW-7995: [C++] Add facility to coalesce and cache reads
Browse files Browse the repository at this point in the history
Closes #6528 from pitrou/ARROW-7995-read-combining and squashes the following commits:

3bfb655 <Antoine Pitrou> Address review comments
593335d <Antoine Pitrou> Enable shared_from_this() on RandomAccessFile
c0aac43 <Antoine Pitrou> Add ReadAsync overrides
b8ce450 <Antoine Pitrou> Fix hang on Windows
489c2fa <Antoine Pitrou> ARROW-7995:  Add facility to coalesce and cache reads

Authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Wes McKinney <wesm+git@apache.org>
  • Loading branch information
pitrou authored and wesm committed Mar 5, 2020
1 parent 66b05ab commit b785fdc
Show file tree
Hide file tree
Showing 16 changed files with 591 additions and 33 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Expand Up @@ -152,6 +152,7 @@ set(ARROW_SRCS
visitor.cc
c/bridge.cc
io/buffered.cc
io/caching.cc
io/compressed.cc
io/file.cc
io/hdfs.cc
Expand Down
106 changes: 106 additions & 0 deletions cpp/src/arrow/io/caching.cc
@@ -0,0 +1,106 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <algorithm>
#include <utility>

#include "arrow/buffer.h"
#include "arrow/io/caching.h"
#include "arrow/io/util_internal.h"
#include "arrow/result.h"
#include "arrow/util/future.h"
#include "arrow/util/logging.h"

namespace arrow {
namespace io {
namespace internal {

struct RangeCacheEntry {
ReadRange range;
Future<std::shared_ptr<Buffer>> future;

friend bool operator<(const RangeCacheEntry& left, const RangeCacheEntry& right) {
return left.range.offset < right.range.offset;
}
};

struct ReadRangeCache::Impl {
std::shared_ptr<RandomAccessFile> file;
int64_t hole_size_limit;
int64_t range_size_limit;

// Ordered by offset (so as to find a matching region by binary search)
std::vector<RangeCacheEntry> entries;

// Add new entries, themselves ordered by offset
void AddEntries(std::vector<RangeCacheEntry> new_entries) {
if (entries.size() > 0) {
std::vector<RangeCacheEntry> merged(entries.size() + new_entries.size());
std::merge(entries.begin(), entries.end(), new_entries.begin(), new_entries.end(),
merged.begin());
entries = std::move(merged);
} else {
entries = std::move(new_entries);
}
}
};

ReadRangeCache::ReadRangeCache(std::shared_ptr<RandomAccessFile> file,
int64_t hole_size_limit, int64_t range_size_limit)
: impl_(new Impl()) {
impl_->file = std::move(file);
impl_->hole_size_limit = hole_size_limit;
impl_->range_size_limit = range_size_limit;
}

ReadRangeCache::~ReadRangeCache() {}

Status ReadRangeCache::Cache(std::vector<ReadRange> ranges) {
ranges = internal::CoalesceReadRanges(std::move(ranges), impl_->hole_size_limit,
impl_->range_size_limit);
std::vector<RangeCacheEntry> entries;
entries.reserve(ranges.size());
for (const auto& range : ranges) {
auto fut = impl_->file->ReadAsync(range.offset, range.length);
entries.push_back({range, std::move(fut)});
}

impl_->AddEntries(std::move(entries));
return Status::OK();
}

Result<std::shared_ptr<Buffer>> ReadRangeCache::Read(ReadRange range) {
if (range.length == 0) {
static const uint8_t byte = 0;
return std::make_shared<Buffer>(&byte, 0);
}

const auto it = std::lower_bound(
impl_->entries.begin(), impl_->entries.end(), range,
[](const RangeCacheEntry& entry, const ReadRange& range) {
return entry.range.offset + entry.range.length < range.offset + range.length;
});
if (it != impl_->entries.end() && it->range.Contains(range)) {
ARROW_ASSIGN_OR_RAISE(auto buf, it->future.result());
return SliceBuffer(std::move(buf), range.offset - it->range.offset, range.length);
}
return Status::Invalid("ReadRangeCache did not find matching cache entry");
}

} // namespace internal
} // namespace io
} // namespace arrow
71 changes: 71 additions & 0 deletions cpp/src/arrow/io/caching.h
@@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <cstdint>
#include <memory>
#include <string>
#include <vector>

#include "arrow/io/interfaces.h"
#include "arrow/util/visibility.h"

namespace arrow {
namespace io {
namespace internal {

/// \brief A read cache designed to hide IO latencies when reading.
///
/// To use this, you must first pass it the ranges you'll need in the future.
/// The cache will combine those ranges according to parameters (see constructor)
/// and start fetching the combined ranges in the background.
/// You can then individually fetch them using Read().
class ARROW_EXPORT ReadRangeCache {
public:
static constexpr int64_t kDefaultHoleSizeLimit = 8192;
static constexpr int64_t kDefaultRangeSizeLimit = 32 * 1024 * 1024;

/// Construct a read cache
///
/// \param[in] hole_size_limit The maximum distance in bytes between two
/// consecutive ranges; beyond this value, ranges are not combined
/// \param[in] range_size_limit The maximum size in bytes of a combined range;
/// if combining two consecutive ranges would produce a range of a size
/// greater than this, they are not combined
explicit ReadRangeCache(std::shared_ptr<RandomAccessFile> file,
int64_t hole_size_limit = kDefaultHoleSizeLimit,
int64_t range_size_limit = kDefaultRangeSizeLimit);
~ReadRangeCache();

/// \brief Cache the given ranges in the background.
///
/// The caller must ensure that the ranges do not overlap with each other,
/// nor with previously cached ranges. Otherwise, behaviour will be undefined.
Status Cache(std::vector<ReadRange> ranges);

/// \brief Read a range previously given to Cache().
Result<std::shared_ptr<Buffer>> Read(ReadRange range);

protected:
struct Impl;
std::unique_ptr<Impl> impl_;
};

} // namespace internal
} // namespace io
} // namespace arrow
19 changes: 13 additions & 6 deletions cpp/src/arrow/io/file.cc
Expand Up @@ -39,6 +39,7 @@
#include <mutex>
#include <sstream>
#include <string>
#include <utility>

// ----------------------------------------------------------------------
// Other Arrow includes
Expand All @@ -50,6 +51,7 @@
#include "arrow/buffer.h"
#include "arrow/memory_pool.h"
#include "arrow/status.h"
#include "arrow/util/future.h"
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h"

Expand Down Expand Up @@ -145,7 +147,7 @@ class OSFile {

Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) {
RETURN_NOT_OK(CheckClosed());
RETURN_NOT_OK(internal::ValidateRegion(position, nbytes));
RETURN_NOT_OK(internal::ValidateRange(position, nbytes));
// ReadAt() leaves the file position undefined, so require that we seek
// before calling Read() or Write().
need_seeking_.store(true);
Expand Down Expand Up @@ -696,7 +698,7 @@ Result<std::shared_ptr<Buffer>> MemoryMappedFile::ReadAt(int64_t position,
: std::unique_lock<std::mutex>();

ARROW_ASSIGN_OR_RAISE(
nbytes, internal::ValidateReadRegion(position, nbytes, memory_map_->size()));
nbytes, internal::ValidateReadRange(position, nbytes, memory_map_->size()));
return memory_map_->Slice(position, nbytes);
}

Expand All @@ -706,7 +708,7 @@ Result<int64_t> MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, void*
? std::unique_lock<std::mutex>(memory_map_->resize_lock())
: std::unique_lock<std::mutex>();
ARROW_ASSIGN_OR_RAISE(
nbytes, internal::ValidateReadRegion(position, nbytes, memory_map_->size()));
nbytes, internal::ValidateReadRange(position, nbytes, memory_map_->size()));
if (nbytes > 0) {
memcpy(out, memory_map_->data() + position, static_cast<size_t>(nbytes));
}
Expand All @@ -727,6 +729,11 @@ Result<std::shared_ptr<Buffer>> MemoryMappedFile::Read(int64_t nbytes) {
return buffer;
}

Future<std::shared_ptr<Buffer>> MemoryMappedFile::ReadAsync(int64_t position,
int64_t nbytes) {
return Future<std::shared_ptr<Buffer>>::MakeFinished(ReadAt(position, nbytes));
}

bool MemoryMappedFile::supports_zero_copy() const { return true; }

Status MemoryMappedFile::WriteAt(int64_t position, const void* data, int64_t nbytes) {
Expand All @@ -736,7 +743,7 @@ Status MemoryMappedFile::WriteAt(int64_t position, const void* data, int64_t nby
if (!memory_map_->opened() || !memory_map_->writable()) {
return Status::IOError("Unable to write");
}
RETURN_NOT_OK(internal::ValidateWriteRegion(position, nbytes, memory_map_->size()));
RETURN_NOT_OK(internal::ValidateWriteRange(position, nbytes, memory_map_->size()));

RETURN_NOT_OK(memory_map_->Seek(position));
return WriteInternal(data, nbytes);
Expand All @@ -749,8 +756,8 @@ Status MemoryMappedFile::Write(const void* data, int64_t nbytes) {
if (!memory_map_->opened() || !memory_map_->writable()) {
return Status::IOError("Unable to write");
}
RETURN_NOT_OK(internal::ValidateWriteRegion(memory_map_->position(), nbytes,
memory_map_->size()));
RETURN_NOT_OK(
internal::ValidateWriteRange(memory_map_->position(), nbytes, memory_map_->size()));

return WriteInternal(data, nbytes);
}
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/io/file.h
Expand Up @@ -221,6 +221,9 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
// zero copy method
Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override;

// Synchronous ReadAsync override
Future<std::shared_ptr<Buffer>> ReadAsync(int64_t position, int64_t nbytes) override;

bool supports_zero_copy() const override;

/// Write data at the current position in the file. Thread-safe
Expand Down
34 changes: 33 additions & 1 deletion cpp/src/arrow/io/file_test.cc
Expand Up @@ -41,6 +41,7 @@
#include "arrow/status.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/util.h"
#include "arrow/util/future.h"
#include "arrow/util/io_util.h"

namespace arrow {
Expand Down Expand Up @@ -364,6 +365,18 @@ TEST_F(TestReadableFile, ReadAt) {
ASSERT_RAISES(Invalid, file_->ReadAt(0, 1));
}

TEST_F(TestReadableFile, ReadAsync) {
MakeTestFile();
OpenFile();

auto fut1 = file_->ReadAsync(1, 10);
auto fut2 = file_->ReadAsync(0, 4);
ASSERT_OK_AND_ASSIGN(auto buf1, fut1.result());
ASSERT_OK_AND_ASSIGN(auto buf2, fut2.result());
AssertBufferEqual(*buf1, "estdata");
AssertBufferEqual(*buf2, "test");
}

TEST_F(TestReadableFile, SeekingRequired) {
MakeTestFile();
OpenFile();
Expand Down Expand Up @@ -594,7 +607,6 @@ TEST_F(TestMemoryMappedFile, MapPartFile) {
TEST_F(TestMemoryMappedFile, WriteRead) {
const int64_t buffer_size = 1024;
std::vector<uint8_t> buffer(buffer_size);

random_bytes(1024, 0, buffer.data());

const int reps = 5;
Expand All @@ -613,6 +625,26 @@ TEST_F(TestMemoryMappedFile, WriteRead) {
}
}

TEST_F(TestMemoryMappedFile, ReadAsync) {
const int64_t buffer_size = 1024;
std::vector<uint8_t> buffer(buffer_size);
random_bytes(1024, 0, buffer.data());

std::string path = "io-memory-map-read-async-test";
ASSERT_OK_AND_ASSIGN(auto mmap, InitMemoryMap(buffer_size, path));
ASSERT_OK(mmap->Write(buffer.data(), buffer_size));

auto fut1 = mmap->ReadAsync(1, 1000);
auto fut2 = mmap->ReadAsync(3, 4);
ASSERT_EQ(fut1.state(), FutureState::SUCCESS);
ASSERT_EQ(fut2.state(), FutureState::SUCCESS);
ASSERT_OK_AND_ASSIGN(auto buf1, fut1.result());
ASSERT_OK_AND_ASSIGN(auto buf2, fut2.result());

AssertBufferEqual(*buf1, Buffer(buffer.data() + 1, 1000));
AssertBufferEqual(*buf2, Buffer(buffer.data() + 3, 4));
}

TEST_F(TestMemoryMappedFile, InvalidReads) {
std::string path = "io-memory-map-invalid-reads-test";
ASSERT_OK_AND_ASSIGN(auto result, InitMemoryMap(4096, path));
Expand Down

0 comments on commit b785fdc

Please sign in to comment.