Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-1422: [C++] Use common Arrow IO interfaces throughout codebase #4404

Closed
wants to merge 18 commits into from

Conversation

wesm
Copy link
Member

@wesm wesm commented May 29, 2019

This is a long overdue unification of platform code that wasn't possible until after the monorepo merge that occurred last year. This should also permit us to take a more consistent approach with regards to asynchronous IO.

A backwards compatibility layer is provided for the now deprecated parquet::RandomAccessSource and parquet::OutputStream classes.

Some incidental changes were required to get things to work:

  • ARROW-5428: Adding a "read extent" option to BufferedInputStream to limit the extent of bytes read from the underlying raw stream
  • arrow::io::InputStream::Peek needed to have its API changed to return Status, because of the next point
  • arrow::io::BufferedOutputStream::Peek will expand the buffer if a Peek is requested that is larger than the buffer. The idea is that it should be possible to "look ahead" in the stream without altering the stream position. This is needed as part of finding the next data header (which can be large or small depending on statistics size, etc.) in a Parquet stream
  • Added a [] operator to Buffer to facilitate testing
  • Some continued "flattening" of the "parquet/util" directory to be simpler

Some outstanding questions:

  • The Parquet reader and writer classes assumed exclusive ownership of the file handles, and they are closed when the Parquet file is closed. Arrow files are shared, and so calling Close is not appropriate. I've attempted to preserve this logic by having Close called in the destructors of the wrapper classes in parquet/deprecated_io.h

An issue I ran into

  • Changes in d82ac40 introduced a unit test with meaningful trailing whitespace, which my editor strips away. I've commented out the offending test and will have to open a JIRA about fixing

@wesm
Copy link
Member Author

wesm commented May 29, 2019

@majetideepak I'm sensitive to how this might impact your use of these classes so if something doesn't build right with these changes please let me know and I'll fix it. The only APIs that changed were relatively internal APIs


ParquetInputWrapper::~ParquetInputWrapper() {
if (!closed_) {
source_->Close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably safe to put a try-catch block here since this can throw inside a destructor.
In the old form:

~SerializedFile() override {
    try {
      Close();
    } catch (...) {
    }
  }

Copy link
Contributor

@majetideepak majetideepak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wesm Thanks for the heads-up. The deprecated API should help us for now.
Regarding your open question on Parquet semantics of handling the ownership of files, I think we can leave the file ownership to the client.
Since this PR deprecates the RandomAccessSource and OutputStream API, the following API must be deprecated as well.

std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
    std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props,
    const std::shared_ptr<FileMetaData>& metadata);
std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
    const std::shared_ptr<OutputStream>& sink,
    const std::shared_ptr<schema::GroupNode>& schema,
    const std::shared_ptr<WriterProperties>& properties,
    const std::shared_ptr<const KeyValueMetadata>& key_value_metadata);

CC: @AnatoliShein

@majetideepak
Copy link
Contributor

I just saw that you have ARROW_DEPRECATED("Use arrow::io::RandomAccessFile version") for ParquetFileReader::Open(). You have to add a similar one for ParquetFileWriter::Open()

@wesm
Copy link
Member Author

wesm commented May 29, 2019

Super weird failure in Travis CI

[ 11%] Building CXX object CMakeFiles/_orc.dir/_orc.cpp.o
/home/travis/build/apache/arrow/python/build/temp.linux-x86_64-2.7/_orc.cpp: In function 'PyCodeObject* __Pyx_createFrameCodeObject(const char*, const char*, int)':
/home/travis/build/apache/arrow/python/build/temp.linux-x86_64-2.7/_orc.cpp:5770:1: error: label 'bad' defined but not used [-Werror=unused-label]
 bad:
 ^~~
cc1plus: all warnings being treated as errors

It looks like Cython just released on conda-forge https://github.com/conda-forge/cython-feedstock -- @pitrou @kszucs have you seen other issues?

@wesm
Copy link
Member Author

wesm commented May 29, 2019

@majetideepak I addressed your comments and added unit tests for the wrapper classes as an extra security measure

Copy link
Member

@pitrou pitrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed the Arrow changes, skipped through the Parquet changes for the most part.

int64_t total_avail = bytes_buffered_;

if (raw_read_bound_ > 0) {
total_avail += raw_read_bound_ - raw_read_total_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why you're doing this here. total_avail is what's left in the buffer, not in the whole stream.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Parquet, it is peeking ahead into the available bytes in the raw stream to look for a data page header. So the total number of bytes available to peak is the number of bytes buffered plus any known additional bytes in the raw stream (as indicated by the bound parameter)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that's only if raw_read_bound_ > 0. Otherwise you're only considering the number of buffered bytes... That seems inconsistent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we are in a difficult place with respect to finding the next data page in the stream. Can you look at where Peek is used in parquet/column_reader.cc so we can focus on addressing the core issue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If raw_read_bound_ is not set, then the total_avail should be treated as unbounded, does that sound reasonable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaned this up and added tests for the unbounded case

// Read more data when buffer has insufficient left
if (nbytes > bytes_buffered_) {
// Read as much as possible to fill the buffer, but not past stream end
int64_t read_size = std::min(nbytes - bytes_buffered_, total_avail);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the min needed? There's already nbytes = std::min(nbytes, total_avail) above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in cleanup

Status Peek(int64_t nbytes, util::string_view* out) {
int64_t total_avail = bytes_buffered_;

if (raw_read_bound_ > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raw_read_bound >= 0 below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix

return util::string_view(nullptr, 0);
Status InputStream::Peek(int64_t ARROW_ARG_UNUSED(nbytes), util::string_view* out) {
*out = util::string_view(nullptr, 0);
return Status::OK();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not return NotImplemented here now that we're returning a Status?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it didn't fail before. Do you think it should fail?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it's not able to peek (it returns an empty string_view with a nullptr). Previously we weren't returning a Status, so we couldn't fail explicitly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (!is_open_) {
return {};
*out = {};
return Status::OK();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should return Status::Invalid like other methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as above

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same answer: not returning a Status meant we couldn't fail explicitly :-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

cpp/src/arrow/io/memory.h Outdated Show resolved Hide resolved
cpp/src/arrow/io/memory.h Outdated Show resolved Hide resolved
cpp/src/parquet/schema-internal.h Show resolved Hide resolved
@wesm
Copy link
Member Author

wesm commented May 29, 2019

I'll address the comments and get the CI passing tomorrow. Would be great to get this merged later tomorrow or Friday so that other Parquet PRs can rebase if necessary

@wesm
Copy link
Member Author

wesm commented May 30, 2019

I think I've addressed all feedback. I'm going to merge this when the build is green if there are no objections

@wesm
Copy link
Member Author

wesm commented May 30, 2019

Oh, glib needs some fixing

constexpr int64_t kDefaultOutputStreamSize = 1024;

std::shared_ptr<::arrow::io::BufferOutputStream> CreateOutputStream(
::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that PARQUET_EXPORT is missing.

https://ci.appveyor.com/project/ApacheSoftwareFoundation/arrow/builds/24938354/job/8eqg024oo1mim9y8#L3103

bloom_filter-test.cc.obj : error LNK2019: unresolved external symbol "class std::shared_ptr<class arrow::io::BufferOutputStream> __cdecl parquet::CreateOutputStream(class arrow::MemoryPool *)" (?CreateOutputStream@parquet@@YA?AV?$shared_ptr@VBufferOutputStream@io@arrow@@@std@@PEAVMemoryPool@arrow@@@Z) referenced in function "private: virtual void __cdecl parquet::test::BasicTest_TestBloomFilter_Test::TestBody(void)" (?TestBody@BasicTest_TestBloomFilter_Test@test@parquet@@EEAAXXZ)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks. Fixing

// ----------------------------------------------------------------------
// Wrapper classes

class ParquetInputWrapper : public ::arrow::io::RandomAccessFile {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PARQUET_EXPORT is missing here too.

bool closed_;
};

class ParquetOutputWrapper : public ::arrow::io::OutputStream {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

@wesm
Copy link
Member Author

wesm commented May 31, 2019

I think I have it sorted out now -- tried actually building on Windows finally =)

Rebased and will await CI

@wesm
Copy link
Member Author

wesm commented May 31, 2019

Travis CI build is passing: https://travis-ci.org/wesm/arrow/builds/539704609. Merging

@wesm wesm closed this in ff2ee42 May 31, 2019
@wesm wesm deleted the parquet-use-arrow-io branch May 31, 2019 15:18
@majetideepak
Copy link
Contributor

@wesm: @czxrrr was porting this new API to Vertica and he discovered that the arrow::io::BufferedInputStream::Peek() is not compatible with the Parquet's implementation in this change.
The issue is that arrow::io::BufferedInputStream::Peek() https://github.com/apache/arrow/blob/master/cpp/src/arrow/io/buffered.cc#L291 uses the Read call which ends up modifying the raw stream's offset. The Parquet's implementation here uses ReadAt which is missing from the Arrow's API.
What are your thoughts in fixing this?

@pitrou
Copy link
Member

pitrou commented Aug 8, 2019

BufferedInputStream takes an input stream, not a random-access file, so it cannot call ReadAt.

class PARQUET_EXPORT BufferedInputStream : public InputStream {
public:
BufferedInputStream(::arrow::MemoryPool* pool, int64_t buffer_size,
RandomAccessSource* source, int64_t start, int64_t end);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Arrow version takes an InputStream instead of RandomAccessSource. This is an issue since InputStream does not have ReadAt.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, yes, there's a reason it's called BufferedInputStream ;-)
Feel free to work on a BufferedRandomFile (also good luck finding the right design).

@wesm
Copy link
Member Author

wesm commented Aug 8, 2019

Can you open a new JIRA issue about investigating this?

std::shared_ptr<::arrow::io::BufferedInputStream> stream;
PARQUET_THROW_NOT_OK(source->Seek(start));
PARQUET_THROW_NOT_OK(::arrow::io::BufferedInputStream::Create(
buffer_size_, pool_, source, &stream, num_bytes));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ArrowInputFile is a RandomAccessFile. However, ::arrow::io::BufferedInputStream takes an InputStream. Casting RandomAccessFile to InputStream is incorrect since the ::arrow::io::BufferedInputStream::Peek causes the RandomAccessFile offset to change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally speaking, when using a buffering layer, the buffering layer owns the underlying raw file. Using both at once is incorrect. This is not Arrow-specific, but happens in any language (Python, C, etc.).

Copy link
Member Author

@wesm wesm Aug 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requirement was not explicitly spelled out before, it only behaved as you wanted "by accident" (I think). Let's open a new JIRA and see what can be done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants