Skip to content

Commit

Permalink
Fix behavior of Tell for GcsFileSystem objects
Browse files Browse the repository at this point in the history
  • Loading branch information
benibus committed Sep 21, 2022
1 parent 4130462 commit 93fa7dc
Showing 1 changed file with 28 additions and 12 deletions.
40 changes: 28 additions & 12 deletions cpp/src/arrow/filesystem/gcsfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,20 @@ class GcsInputStream : public arrow::io::InputStream {
return Status::OK();
}

Result<int64_t> Tell() const override {
Result<int64_t> Tell() const override { return TellOr(nread_); }

// At EOF, gcs::ObjectReadStream::tellg() returns -1, but our APIs canonically return
// the stream size. This method helps with the conversion.
Result<int64_t> TellOr(int64_t max_pos) const {
if (closed()) return Status::Invalid("Cannot use Tell() on a closed stream");
return stream_.tellg();
int64_t pos = stream_.tellg();
if (pos < 0) {
if (!stream_.eof()) {
return Status::IOError("Tell() failed before end of stream");
}
return max_pos;
}
return pos;
}

// A gcs::ObjectReadStream can be "born closed". For small objects the stream returns
Expand All @@ -140,15 +151,19 @@ class GcsInputStream : public arrow::io::InputStream {
if (closed()) return Status::Invalid("Cannot read from a closed stream");
stream_.read(static_cast<char*>(out), nbytes);
ARROW_GCS_RETURN_NOT_OK(stream_.status());
return stream_.gcount();
int64_t nread = stream_.gcount();
nread_ += nread;
return nread;
}

Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
if (closed()) return Status::Invalid("Cannot read from a closed stream");
ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateResizableBuffer(nbytes));
stream_.read(reinterpret_cast<char*>(buffer->mutable_data()), nbytes);
ARROW_GCS_RETURN_NOT_OK(stream_.status());
RETURN_NOT_OK(buffer->Resize(stream_.gcount(), true));
int64_t nread = stream_.gcount();
nread_ += nread;
RETURN_NOT_OK(buffer->Resize(nread, true));
return std::shared_ptr<Buffer>(std::move(buffer));
}
//@}
Expand All @@ -167,6 +182,7 @@ class GcsInputStream : public arrow::io::InputStream {
GcsPath path_;
gcs::Generation generation_;
gcs::Client client_;
int64_t nread_ = 0; // Total bytes consumed (updated after each Read())
bool closed_ = false;
};

Expand Down Expand Up @@ -226,13 +242,13 @@ class GcsOutputStream : public arrow::io::OutputStream {
bool closed_ = false;
};

using InputStreamFactory = std::function<Result<std::shared_ptr<io::InputStream>>(
using InputStreamFactory = std::function<Result<std::shared_ptr<GcsInputStream>>(
gcs::Generation, gcs::ReadFromOffset)>;

class GcsRandomAccessFile : public arrow::io::RandomAccessFile {
public:
GcsRandomAccessFile(InputStreamFactory factory, gcs::ObjectMetadata metadata,
std::shared_ptr<io::InputStream> stream)
std::shared_ptr<GcsInputStream> stream)
: factory_(std::move(factory)),
metadata_(std::move(metadata)),
stream_(std::move(stream)) {}
Expand All @@ -242,7 +258,7 @@ class GcsRandomAccessFile : public arrow::io::RandomAccessFile {
// @name FileInterface
Status Close() override { return stream_->Close(); }
Status Abort() override { return stream_->Abort(); }
Result<int64_t> Tell() const override { return stream_->Tell(); }
Result<int64_t> Tell() const override { return stream_->TellOr(metadata_.size()); }
bool closed() const override { return stream_->closed(); }
//@}

Expand Down Expand Up @@ -296,7 +312,7 @@ class GcsRandomAccessFile : public arrow::io::RandomAccessFile {
private:
InputStreamFactory factory_;
gcs::ObjectMetadata metadata_;
std::shared_ptr<io::InputStream> stream_;
std::shared_ptr<GcsInputStream> stream_;
};

google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) {
Expand Down Expand Up @@ -598,15 +614,15 @@ class GcsFileSystem::Impl {
return internal::ToArrowStatus(metadata.status());
}

Result<std::shared_ptr<io::InputStream>> OpenInputStream(const GcsPath& path,
gcs::Generation generation,
gcs::ReadFromOffset offset) {
Result<std::shared_ptr<GcsInputStream>> OpenInputStream(const GcsPath& path,
gcs::Generation generation,
gcs::ReadFromOffset offset) {
auto stream = client_.ReadObject(path.bucket, path.object, generation, offset);
ARROW_GCS_RETURN_NOT_OK(stream.status());
return std::make_shared<GcsInputStream>(std::move(stream), path, generation, client_);
}

Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
Result<std::shared_ptr<GcsOutputStream>> OpenOutputStream(
const GcsPath& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
std::shared_ptr<const KeyValueMetadata> resolved_metadata = metadata;
if (resolved_metadata == nullptr && options_.default_metadata != nullptr) {
Expand Down

0 comments on commit 93fa7dc

Please sign in to comment.