Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/arrow/adapters/orc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class ArrowInputFile : public liborc::InputStream {
uint64_t getNaturalReadSize() const override { return 128 * 1024; }

void read(void* buf, uint64_t length, uint64_t offset) override {
ORC_ASSIGN_OR_THROW(int64_t bytes_read, file_->ReadAt(offset, length, buf));
ORC_ASSIGN_OR_THROW(int64_t bytes_read, file_->ReadAt(offset, length, /*allow_short_read=*/true , buf));

if (static_cast<uint64_t>(bytes_read) != length) {
throw liborc::ParseError("Short read from arrow input file");
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/buffer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ TEST(TestBuffer, GetReader) {
auto buf = std::make_shared<Buffer>(data, data_str.size());
ASSERT_OK_AND_ASSIGN(auto reader, Buffer::GetReader(buf));
ASSERT_OK_AND_EQ(static_cast<int64_t>(data_str.size()), reader->GetSize());
ASSERT_OK_AND_ASSIGN(auto read_buf, reader->ReadAt(5, 4));
ASSERT_OK_AND_ASSIGN(auto read_buf, reader->ReadAt(5, 4,/*allow_short_read=*/true));
AssertBufferEqual(*read_buf, "data");
}

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/io/file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,8 @@ TEST_F(TestReadableFile, ReadAsync) {
MakeTestFile();
OpenFile();

auto fut1 = file_->ReadAsync(default_io_context(), 1, 10);
auto fut2 = file_->ReadAsync(default_io_context(), 0, 4);
auto fut1 = file_->ReadAsync(default_io_context(), 1, 10, /*allow_short_read=*/true);
auto fut2 = file_->ReadAsync(default_io_context(), 0, 4, /*allow_short_read=*/true);
auto fut3 = file_->ReadAsync(default_io_context(), 1, 10, /*allow_short_read=*/false);
auto fut4 = file_->ReadAsync(default_io_context(), 0, 4, /*allow_short_read=*/false);
ASSERT_OK_AND_ASSIGN(auto buf1, fut1.result());
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/arrow/io/interfaces.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ RandomAccessFile::RandomAccessFile() : interface_impl_(new Impl()) {}

Result<int64_t> RandomAccessFile::ReadAt(int64_t position, int64_t nbytes,
bool allow_short_read, void* out) {
ARROW_SUPPRESS_DEPRECATION_WARNING
ARROW_ASSIGN_OR_RAISE(auto real_nbytes, ReadAt(position, nbytes, out));
ARROW_UNSUPPRESS_DEPRECATION_WARNING
if (!allow_short_read && real_nbytes != nbytes) {
return Status::IOError("File too short: expected to be able to read ", nbytes,
" bytes, got ", real_nbytes);
Expand All @@ -167,7 +169,9 @@ Result<int64_t> RandomAccessFile::ReadAt(int64_t position, int64_t nbytes, void*

Result<std::shared_ptr<Buffer>> RandomAccessFile::ReadAt(int64_t position, int64_t nbytes,
bool allow_short_read) {
ARROW_SUPPRESS_DEPRECATION_WARNING
ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(position, nbytes));
ARROW_UNSUPPRESS_DEPRECATION_WARNING
// XXX the internal `IoRecordedRandomAccessFile` can return a null buffer
if (!allow_short_read && buffer && buffer->size() != nbytes) {
return Status::IOError("File too short: expected to be able to read ", nbytes,
Expand Down Expand Up @@ -277,7 +281,7 @@ class FileSegmentReader
RETURN_NOT_OK(CheckOpen());
int64_t bytes_to_read = std::min(nbytes, nbytes_ - position_);
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
file_->ReadAt(file_offset_ + position_, bytes_to_read, out));
file_->ReadAt(file_offset_ + position_, bytes_to_read,/*allow_short_read=*/true, out));
position_ += bytes_read;
return bytes_read;
}
Expand All @@ -286,7 +290,7 @@ class FileSegmentReader
RETURN_NOT_OK(CheckOpen());
int64_t bytes_to_read = std::min(nbytes, nbytes_ - position_);
ARROW_ASSIGN_OR_RAISE(auto buffer,
file_->ReadAt(file_offset_ + position_, bytes_to_read));
file_->ReadAt(file_offset_ + position_, bytes_to_read , /*allow_short_read=*/true));
position_ += buffer->size();
return buffer;
}
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/arrow/io/interfaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#include "arrow/util/macros.h"
#include "arrow/util/type_fwd.h"
#include "arrow/util/visibility.h"

namespace arrow {
namespace io {

Expand Down Expand Up @@ -295,6 +294,7 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable {
/// \param[in] nbytes The number of bytes to read
/// \param[out] out The buffer to read bytes into
/// \return The number of bytes read, or an error
ARROW_DEPRECATED("Deprecated in 17.0.0. Use signature with allow_short_read instead")
virtual Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out);

/// \brief Read data from given file position.
Expand All @@ -318,18 +318,21 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable {
/// \param[in] position Where to read bytes from
/// \param[in] nbytes The number of bytes to read
/// \return A buffer containing the bytes read, or an error
ARROW_DEPRECATED("Deprecated in 17.0.0. Use signature with allow_short_read instead")
virtual Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes);

/// EXPERIMENTAL: Read data asynchronously.
virtual Future<std::shared_ptr<Buffer>> ReadAsync(const IOContext&, int64_t position,
int64_t nbytes,
bool allow_short_read);
ARROW_DEPRECATED("Deprecated in 17.0.0. Use signature with allow_short_read instead")
virtual Future<std::shared_ptr<Buffer>> ReadAsync(const IOContext&, int64_t position,
int64_t nbytes);

/// EXPERIMENTAL: Read data asynchronously, using the file's IOContext.
Future<std::shared_ptr<Buffer>> ReadAsync(int64_t position, int64_t nbytes,
bool allow_short_read);
ARROW_DEPRECATED("Deprecated in 17.0.0. Use signature with allow_short_read instead")
Future<std::shared_ptr<Buffer>> ReadAsync(int64_t position, int64_t nbytes);

/// EXPERIMENTAL: Explicit multi-read.
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/io/slow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ Result<std::shared_ptr<Buffer>> SlowRandomAccessFile::Read(int64_t nbytes) {
Result<int64_t> SlowRandomAccessFile::ReadAt(int64_t position, int64_t nbytes,
void* out) {
latencies_->Sleep();
return stream_->ReadAt(position, nbytes, out);
return stream_->ReadAt(position, nbytes,/*allow_short_read=*/true ,out);
}

Result<int64_t> SlowRandomAccessFile::ReadAt(int64_t position, int64_t nbytes,
Expand All @@ -143,7 +143,7 @@ Result<int64_t> SlowRandomAccessFile::ReadAt(int64_t position, int64_t nbytes,
Result<std::shared_ptr<Buffer>> SlowRandomAccessFile::ReadAt(int64_t position,
int64_t nbytes) {
latencies_->Sleep();
return stream_->ReadAt(position, nbytes);
return stream_->ReadAt(position, nbytes , /*allow_short_read=*/true);
}

Result<std::shared_ptr<Buffer>> SlowRandomAccessFile::ReadAt(int64_t position,
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/io/test_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class TrackedRandomAccessFileImpl : public TrackedRandomAccessFile {
Result<int64_t> GetSize() override { return delegate_->GetSize(); }
Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override {
SaveReadRange(position, nbytes);
return delegate_->ReadAt(position, nbytes, out);
return delegate_->ReadAt(position, nbytes , /*allow_short_read=*/true ,out);
}
Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) override {
return ReadAt(position, nbytes, /*allow_short_read=*/true);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ Status ReadFieldsSubset(int64_t offset, int32_t metadata_length,
RETURN_NOT_OK(fields_loader(batch, &io_recorded_random_access_file));
const auto& read_ranges = io_recorded_random_access_file.GetReadRanges();
for (const auto& range : read_ranges) {
auto read_result = file->ReadAt(offset + metadata_length + range.offset, range.length,
auto read_result = file->ReadAt(offset + metadata_length + range.offset, range.length,/*allow_short_read=*/true,
body->mutable_data() + range.offset);
if (!read_result.ok()) {
return Status::IOError("Failed to read message body, error ",
Expand Down