Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class DelayedBufferReader : public ::arrow::io::BufferReader {
return DeferNotOk(::arrow::io::internal::SubmitIO(
io_context, [self, position, nbytes]() -> Result<std::shared_ptr<Buffer>> {
std::this_thread::sleep_for(std::chrono::seconds(1));
return self->DoReadAt(position, nbytes);
return self->DoReadAt(position, nbytes, /*allow_short_read=*/false);
}));
}

Expand Down
26 changes: 18 additions & 8 deletions cpp/src/arrow/gpu/cuda_memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,28 +260,38 @@ Status CudaBufferReader::DoSeek(int64_t position) {
}

Result<int64_t> CudaBufferReader::DoReadAt(int64_t position, int64_t nbytes,
void* buffer) {
bool allow_short_read, void* buffer) {
RETURN_NOT_OK(CheckClosed());

nbytes = std::min(nbytes, size_ - position);
RETURN_NOT_OK(context_->CopyDeviceToHost(buffer, address_ + position, nbytes));
return nbytes;
auto real_nbytes = std::min(nbytes, size_ - position);
if (!allow_short_read && real_nbytes != nbytes) {
return Status::IOError("Cuda buffer too short: expected to be able to read ", nbytes,
" bytes, got ", real_nbytes);
}
RETURN_NOT_OK(context_->CopyDeviceToHost(buffer, address_ + position, real_nbytes));
return real_nbytes;
}

Result<int64_t> CudaBufferReader::DoRead(int64_t nbytes, void* buffer) {
RETURN_NOT_OK(CheckClosed());

ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, DoReadAt(position_, nbytes, buffer));
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
DoReadAt(position_, nbytes, /*allow_short_read=*/true, buffer));
position_ += bytes_read;
return bytes_read;
}

Result<std::shared_ptr<Buffer>> CudaBufferReader::DoReadAt(int64_t position,
int64_t nbytes) {
int64_t nbytes,
bool allow_short_read) {
RETURN_NOT_OK(CheckClosed());

int64_t size = std::min(nbytes, size_ - position);
return std::make_shared<CudaBuffer>(buffer_, position, size);
auto real_nbytes = std::min(nbytes, size_ - position);
if (!allow_short_read && real_nbytes != nbytes) {
return Status::IOError("Cuda buffer too short: expected to be able to read ", nbytes,
" bytes, got ", real_nbytes);
}
return std::make_shared<CudaBuffer>(buffer_, position, real_nbytes);
}

Result<std::shared_ptr<Buffer>> CudaBufferReader::DoRead(int64_t nbytes) {
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/arrow/gpu/cuda_memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,10 @@ class ARROW_CUDA_EXPORT CudaBufferReader

Result<int64_t> DoRead(int64_t nbytes, void* buffer);
Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes);
Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, void* out);
Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes);
Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, bool allow_short_read,
void* out);
Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes,
bool allow_short_read);

Result<int64_t> DoTell() const;
Status DoSeek(int64_t position);
Expand Down
9 changes: 6 additions & 3 deletions cpp/src/arrow/io/caching.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ struct ReadRangeCache::Impl {
std::vector<RangeCacheEntry> new_entries;
new_entries.reserve(ranges.size());
for (const auto& range : ranges) {
new_entries.emplace_back(range, file->ReadAsync(ctx, range.offset, range.length));
new_entries.emplace_back(range, file->ReadAsync(ctx, range.offset, range.length,
/*allow_short_read=*/false));
}
return new_entries;
}
Expand Down Expand Up @@ -219,7 +220,8 @@ struct ReadRangeCache::Impl {
++next_it) {
if (!next_it->future.is_valid()) {
next_it->future =
file->ReadAsync(ctx, next_it->range.offset, next_it->range.length);
file->ReadAsync(ctx, next_it->range.offset, next_it->range.length,
/*allow_short_read=*/false);
}
++num_prefetched;
}
Expand Down Expand Up @@ -272,7 +274,8 @@ struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl {
Future<std::shared_ptr<Buffer>> MaybeRead(RangeCacheEntry* entry) override {
// Called by superclass Read()/WaitFor() so we have the lock
if (!entry->future.is_valid()) {
entry->future = file->ReadAsync(ctx, entry->range.offset, entry->range.length);
entry->future = file->ReadAsync(ctx, entry->range.offset, entry->range.length,
/*allow_short_read=*/false);
}
return entry->future;
}
Expand Down
14 changes: 12 additions & 2 deletions cpp/src/arrow/io/concurrency.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,23 @@ class RandomAccessFileConcurrencyWrapper : public RandomAccessFile {
// to use the exclusive_guard.

Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) final {
return ReadAt(position, nbytes, /*allow_short_read=*/true, out);
}

Result<int64_t> ReadAt(int64_t position, int64_t nbytes, bool allow_short_read,
void* out) final {
auto guard = lock_.shared_guard();
return derived()->DoReadAt(position, nbytes, out);
return derived()->DoReadAt(position, nbytes, allow_short_read, out);
}

Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) final {
return ReadAt(position, nbytes, /*allow_short_read=*/true);
}

Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes,
bool allow_short_read) final {
auto guard = lock_.shared_guard();
return derived()->DoReadAt(position, nbytes);
return derived()->DoReadAt(position, nbytes, allow_short_read);
}

/*
Expand Down
36 changes: 22 additions & 14 deletions cpp/src/arrow/io/file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,21 @@ class OSFile {
return ::arrow::internal::FileRead(fd_.fd(), reinterpret_cast<uint8_t*>(out), nbytes);
}

Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) {
Result<int64_t> ReadAt(int64_t position, int64_t nbytes, bool allow_short_read,
void* out) {
RETURN_NOT_OK(CheckClosed());
RETURN_NOT_OK(internal::ValidateRange(position, nbytes));
// ReadAt() leaves the file position undefined, so require that we seek
// before calling Read() or Write().
need_seeking_.store(true);
return ::arrow::internal::FileReadAt(fd_.fd(), reinterpret_cast<uint8_t*>(out),
position, nbytes);
ARROW_ASSIGN_OR_RAISE(auto real_nbytes, ::arrow::internal::FileReadAt(
fd_.fd(), reinterpret_cast<uint8_t*>(out),
position, nbytes));
if (!allow_short_read && real_nbytes != nbytes) {
return Status::IOError("File too short: expected to be able to read ", nbytes,
" bytes, got ", real_nbytes);
}
return real_nbytes;
}

Status Seek(int64_t pos) {
Expand Down Expand Up @@ -230,21 +237,20 @@ class ReadableFile::ReadableFileImpl : public OSFile {
RETURN_NOT_OK(buffer->Resize(bytes_read));
buffer->ZeroPadding();
}
// R build with openSUSE155 requires an explicit shared_ptr construction
return std::shared_ptr<Buffer>(std::move(buffer));
return buffer;
}

Result<std::shared_ptr<Buffer>> ReadBufferAt(int64_t position, int64_t nbytes) {
Result<std::shared_ptr<Buffer>> ReadBufferAt(int64_t position, int64_t nbytes,
bool allow_short_read) {
ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_));

ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
ReadAt(position, nbytes, buffer->mutable_data()));
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(position, nbytes, allow_short_read,
buffer->mutable_data()));
if (bytes_read < nbytes) {
RETURN_NOT_OK(buffer->Resize(bytes_read));
buffer->ZeroPadding();
}
// R build with openSUSE155 requires an explicit shared_ptr construction
return std::shared_ptr<Buffer>(std::move(buffer));
return buffer;
}

Status WillNeed(const std::vector<ReadRange>& ranges) {
Expand Down Expand Up @@ -322,12 +328,14 @@ Result<int64_t> ReadableFile::DoRead(int64_t nbytes, void* out) {
return impl_->Read(nbytes, out);
}

Result<int64_t> ReadableFile::DoReadAt(int64_t position, int64_t nbytes, void* out) {
return impl_->ReadAt(position, nbytes, out);
Result<int64_t> ReadableFile::DoReadAt(int64_t position, int64_t nbytes,
bool allow_short_read, void* out) {
return impl_->ReadAt(position, nbytes, allow_short_read, out);
}

Result<std::shared_ptr<Buffer>> ReadableFile::DoReadAt(int64_t position, int64_t nbytes) {
return impl_->ReadBufferAt(position, nbytes);
Result<std::shared_ptr<Buffer>> ReadableFile::DoReadAt(int64_t position, int64_t nbytes,
bool allow_short_read) {
return impl_->ReadBufferAt(position, nbytes, allow_short_read);
}

Result<std::shared_ptr<Buffer>> ReadableFile::DoRead(int64_t nbytes) {
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/arrow/io/file.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,12 @@ class ARROW_EXPORT ReadableFile
Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes);

/// \brief Thread-safe implementation of ReadAt
Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, void* out);
Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, bool allow_short_read,
void* out);

/// \brief Thread-safe implementation of ReadAt
Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes);
Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes,
bool allow_short_read);

Result<int64_t> DoGetSize();
Status DoSeek(int64_t position);
Expand Down
11 changes: 9 additions & 2 deletions cpp/src/arrow/io/file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <thread>
#include <vector>

#include <gmock/gmock-matchers.h>
#include <gtest/gtest.h>

#include "arrow/buffer.h"
Expand Down Expand Up @@ -399,12 +400,18 @@ TEST_F(TestReadableFile, ReadAsync) {
MakeTestFile();
OpenFile();

auto fut1 = file_->ReadAsync({}, 1, 10);
auto fut2 = file_->ReadAsync({}, 0, 4);
auto fut1 = file_->ReadAsync(default_io_context(), 1, 10);
auto fut2 = file_->ReadAsync(default_io_context(), 0, 4);
auto fut3 = file_->ReadAsync(default_io_context(), 1, 10, /*allow_short_read=*/false);
auto fut4 = file_->ReadAsync(default_io_context(), 0, 4, /*allow_short_read=*/false);
ASSERT_OK_AND_ASSIGN(auto buf1, fut1.result());
ASSERT_OK_AND_ASSIGN(auto buf2, fut2.result());
EXPECT_RAISES_WITH_MESSAGE_THAT(IOError, ::testing::HasSubstr("File too short"),
fut3.result());
ASSERT_OK_AND_ASSIGN(auto buf4, fut4.result());
AssertBufferEqual(*buf1, "estdata");
AssertBufferEqual(*buf2, "test");
AssertBufferEqual(*buf4, "test");
}

TEST_F(TestReadableFile, ReadManyAsync) {
Expand Down
45 changes: 40 additions & 5 deletions cpp/src/arrow/io/interfaces.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,38 +149,73 @@ RandomAccessFile::~RandomAccessFile() = default;

RandomAccessFile::RandomAccessFile() : interface_impl_(new Impl()) {}

Result<int64_t> RandomAccessFile::ReadAt(int64_t position, int64_t nbytes,
bool allow_short_read, void* out) {
ARROW_ASSIGN_OR_RAISE(auto real_nbytes, ReadAt(position, nbytes, out));
if (!allow_short_read && real_nbytes != nbytes) {
return Status::IOError("File too short: expected to be able to read ", nbytes,
" bytes, got ", real_nbytes);
}
return real_nbytes;
}

Result<int64_t> RandomAccessFile::ReadAt(int64_t position, int64_t nbytes, void* out) {
std::lock_guard<std::mutex> lock(interface_impl_->lock_);
RETURN_NOT_OK(Seek(position));
return Read(nbytes, out);
}

Result<std::shared_ptr<Buffer>> RandomAccessFile::ReadAt(int64_t position, int64_t nbytes,
bool allow_short_read) {
ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(position, nbytes));
// XXX the internal `IoRecordedRandomAccessFile` can return a null buffer
if (!allow_short_read && buffer && buffer->size() != nbytes) {
return Status::IOError("File too short: expected to be able to read ", nbytes,
" bytes, got ", buffer->size());
}
Comment thread
lidavidm marked this conversation as resolved.
return buffer;
}

Result<std::shared_ptr<Buffer>> RandomAccessFile::ReadAt(int64_t position,
int64_t nbytes) {
std::lock_guard<std::mutex> lock(interface_impl_->lock_);
RETURN_NOT_OK(Seek(position));
return Read(nbytes);
}

// Default ReadAsync() implementation: simply issue the read on the context's executor
Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(const IOContext& ctx,
int64_t position,
int64_t nbytes) {
return ReadAsync(ctx, position, nbytes, /*allow_short_read=*/true);
}

// Default ReadAsync() implementation: simply issue the read on the context's executor
Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(const IOContext& ctx,
int64_t position,
int64_t nbytes,
bool allow_short_read) {
auto self = std::dynamic_pointer_cast<RandomAccessFile>(shared_from_this());
return DeferNotOk(internal::SubmitIO(
ctx, [self, position, nbytes] { return self->ReadAt(position, nbytes); }));
return DeferNotOk(internal::SubmitIO(ctx, [self, position, nbytes, allow_short_read] {
return self->ReadAt(position, nbytes, allow_short_read);
}));
}

Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(int64_t position,
int64_t nbytes) {
return ReadAsync(io_context(), position, nbytes);
return ReadAsync(io_context(), position, nbytes, /*allow_short_read=*/true);
}

Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(int64_t position,
int64_t nbytes,
bool allow_short_read) {
return ReadAsync(io_context(), position, nbytes, allow_short_read);
}

std::vector<Future<std::shared_ptr<Buffer>>> RandomAccessFile::ReadManyAsync(
const IOContext& ctx, const std::vector<ReadRange>& ranges) {
std::vector<Future<std::shared_ptr<Buffer>>> ret;
for (auto r : ranges) {
ret.push_back(this->ReadAsync(ctx, r.offset, r.length));
ret.push_back(this->ReadAsync(ctx, r.offset, r.length, /*allow_short_read=*/false));
}
return ret;
}
Expand Down
39 changes: 36 additions & 3 deletions cpp/src/arrow/io/interfaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,9 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable {

/// \brief Read data from given file position.
///
/// At most `nbytes` bytes are read. The number of bytes read is returned
/// (it can be less than `nbytes` if EOF is reached).
/// At most `nbytes` bytes are read. The number of bytes read is returned.
/// If `allow_short_read` is true, the number of bytes read can be less than
/// `nbytes` if EOF is reached, otherwise an error is returned.
///
/// This method can be safely called from multiple threads concurrently.
/// It is unspecified whether this method updates the file position or not.
Expand All @@ -279,24 +280,56 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable {
///
/// \param[in] position Where to read bytes from
/// \param[in] nbytes The number of bytes to read
/// \param[in] allow_short_read Whether to allow reading less than `nbytes`
/// \param[out] out The buffer to read bytes into
/// \return The number of bytes read, or an error
virtual Result<int64_t> ReadAt(int64_t position, int64_t nbytes, bool allow_short_read,
void* out);

/// \brief Read data from given file position.
///
/// Like `ReadAt(position, nbytes, allow_short_read, out)` with `allow_short_read`
/// set to true.
Comment on lines +291 to +292
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should deprecate these overloads over time (it feels like it would be safer to have allow_short_read be the opt-in rather than opt-out behavior at least)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By "these overloads", you mean those without the allow_short_read parameter, right?

And, yes, I agree that disallowing short reads by default would definitely be safer. Short reads by default is fine in a "safe" language like Python, not so much in C++.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it would be safer if they were eventually removed to avoid this cropping up.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry I missed your comment above. Yes, I agree, we should do that in a later PR.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I'll open a separate issue and PR for deprecation)

///
/// \param[in] position Where to read bytes from
/// \param[in] nbytes The number of bytes to read
/// \param[out] out The buffer to read bytes into
/// \return The number of bytes read, or an error
virtual Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out);

/// \brief Read data from given file position.
///
/// At most `nbytes` bytes are read, but it can be less if EOF is reached.
/// At most `nbytes` bytes are read. If `allow_short_read` is true, the
/// number of bytes read can be less than `nbytes` if EOF is reached,
/// otherwise an error is returned.
///
/// \param[in] position Where to read bytes from
/// \param[in] nbytes The number of bytes to read
/// \param[in] allow_short_read Whether to allow reading less than `nbytes`
/// \return A buffer containing the bytes read, or an error
virtual Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes,
bool allow_short_read);

/// \brief Read data from given file position.
///
/// Like `ReadAt(position, nbytes, allow_short_read)` with `allow_short_read`
/// set to true.
///
/// \param[in] position Where to read bytes from
/// \param[in] nbytes The number of bytes to read
/// \return A buffer containing the bytes read, or an error
virtual Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes);

/// EXPERIMENTAL: Read data asynchronously.
virtual Future<std::shared_ptr<Buffer>> ReadAsync(const IOContext&, int64_t position,
int64_t nbytes,
bool allow_short_read);
virtual Future<std::shared_ptr<Buffer>> ReadAsync(const IOContext&, int64_t position,
int64_t nbytes);

/// EXPERIMENTAL: Read data asynchronously, using the file's IOContext.
Future<std::shared_ptr<Buffer>> ReadAsync(int64_t position, int64_t nbytes,
bool allow_short_read);
Future<std::shared_ptr<Buffer>> ReadAsync(int64_t position, int64_t nbytes);

/// EXPERIMENTAL: Explicit multi-read.
Expand Down
Loading
Loading