Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-6720:[JAVA][C++]Support Parquet Read and Write in Java #5522

Closed
wants to merge 4 commits into from

Conversation

xuechendi
Copy link

Added a new java interface to support parquet read and write from hdfs or local file.

The purpose of this implementation is that when we loading and dumping parquet data in Java, we can only use rowBased put and get methods. Since arrow already has C++ implementation to load and dump parquet, so we wrapped those codes as Java APIs.

After test, we noticed in our workload, performance improved more than 2x comparing with rowBased load and dump. So we want to contribute codes to arrow.

This is a total independent change:
c++ codes are under cpp/jni/parquet, used as jni bridge.
java codes are under java/adapter/parquet, ParquetReader and ParquetWriter is the main interface class. Java UnitTest is also included.

Signed-off-by: Chendi Xue chendi.xue@intel.com

@xuechendi xuechendi force-pushed the wip_parquet branch 2 times, most recently from 4b537e0 to 38055e1 Compare September 27, 2019 12:26
@wesm
Copy link
Member

wesm commented Sep 27, 2019

I will review the C++ part of this in more detail when I can, but as a high level comment we will need the code itself to follow the Google C++ style guide and also conform to our file naming conventions (lower_with_underscores.h/cc).

@emkornfield
Copy link
Contributor

emkornfield commented Sep 27, 2019

Another high level comment is this seems to duplicate a lot of code from ORC adapter (the thread-safe map popped out as one example). Is it possible to refactor to share common implementations?

@xuechendi
Copy link
Author

Another high level comment is this seems to duplicate a lot of code from ORC adapter (the thread-safe map popped out as one example). Is it share common implementations?

@emkornfield, yes, currentmap was copied from orc, so as some codes in java side, the reason I did as that way is that I was thinking to limit my modification to existing codes, so I copied it to parquet folder.
Better way is to make the duplicated part as a common lib, but I am thinking will it be ok if I open another PR or commit for the codes refine?

@xuechendi
Copy link
Author

I will review the C++ part of this in more detail when I can, but as a high level comment we will need the code itself to follow the Google C++ style guide and also conform to our file naming conventions (lower_with_underscores.h/cc).

@wesm , yes, I noticed that, and I am working on to fixing the coding style problem, wish to pass CI firstly. And thanks for the code review.

@github-actions
Copy link


Status FileConnector::openReadable() {
Status msg = ReadableFile::Open(filePath, &fileReader);
if (!msg.ok()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, all of these instances where exit is called when a non-OK Status is returned should be changed to just return the Status.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lihalite , I see, thanks, I will fix that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lihalite , I have fixed the issue. Thanks

@codecov-io
Copy link

codecov-io commented Oct 1, 2019

Codecov Report

Merging #5522 into master will decrease coverage by 10.86%.
The diff coverage is n/a.

Impacted file tree graph

@@             Coverage Diff             @@
##           master    #5522       +/-   ##
===========================================
- Coverage   88.82%   77.95%   -10.87%     
===========================================
  Files         983       59      -924     
  Lines      132974     4554   -128420     
  Branches     1501        0     -1501     
===========================================
- Hits       118108     3550   -114558     
+ Misses      14501     1004    -13497     
+ Partials      365        0      -365
Impacted Files Coverage Δ
python/pyarrow/ipc.pxi
cpp/src/parquet/column_page.h
cpp/src/plasma/test/external_store_tests.cc
cpp/src/arrow/array/builder_decimal.cc
cpp/src/plasma/client.cc
.../thirdparty/flatbuffers/include/flatbuffers/base.h
cpp/src/arrow/flight/internal.cc
cpp/src/arrow/compute/compute_test.cc
cpp/src/arrow/python/io.cc
python/pyarrow/hdfs.py
... and 914 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 4db8f7b...b248e64. Read the comment docs.

@xuechendi xuechendi force-pushed the wip_parquet branch 2 times, most recently from a67c25e to 54620b6 Compare October 2, 2019 09:08
@xuechendi
Copy link
Author

@wesm , just passed CI checks, thanks

@xuechendi
Copy link
Author

I will review the C++ part of this in more detail when I can, but as a high level comment we will need the code itself to follow the Google C++ style guide and also conform to our file naming conventions (lower_with_underscores.h/cc).

@wesm , yes, I noticed that, and I am working on to fixing the coding style problem, wish to pass CI firstly. And thanks for the code review.

@wesm , I have renamed all files and codes passed CI checks, thanks

virtual void teardown() = 0;

protected:
std::string filePath;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming convention for members is file_path_

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emkornfield , thanks, fixed naming convention issue.

public:
Connector() {}
std::string getFileName() { return filePath; }
virtual ::arrow::Status openReadable(bool useHdfs3) = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming convention for methods is OpenReadable.

Copy link
Contributor

@emkornfield emkornfield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a partial review. There are still some style issues and lack of documentaiton (at a minimum classes in header files need docs).

Also I'm a little concerned about the Connector classes, it seems that they are replication filesystem / io interfaces without a clear reason why (documentation could help here). Following the pattern that ORC used here seems better. An ideal solution (would allow passing of Readers/Writers directly).

namespace jni {
namespace parquet {

class Connector {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please provide class documentation. for this class. It looks like this has nothing to do with Parquet, can it be moved someplace else (maybe under filesystem)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually how is this different then FileSystem (and for instance HdfsFileSystem (https://github.com/apache/arrow/blob/master/cpp/src/arrow/io/hdfs.h#L71)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emkornfield , the reason of keeping connector classes under jni/parquet is that, I want to make the connector interface unified regardless its connection type(hdfs or local), while I can't find an unified class under arrow/io or arrow/filesystem, for example, hdfsFileSystem are derived from Filesystem while openReadable under file.h are derived from RandomAccessFile, and LocalFileSystem under arrow/filesystem/localfs.h does not have OpenReadable interface.
Since I hope to limit my code change to existing module, so I am figuring as what I did now to add a small wrapper firstly to make the ParquetReader and ParquetWriter functioning, then I can open a new PR to move the connector classes to arrow/filesystem and make it as a baseFileSystem class? Do you think that makes sense to you?

namespace jni {
namespace parquet {

class FileConnector : public Connector {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs documentation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

documentation added, thanks


class Connector {
public:
Connector() {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Connector() = default; ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

class Connector {
public:
Connector() {}
std::string getFileName() { return filePath; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should be a virtual destructor?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

std::string filePath;
std::string dirPath;
std::string getPathDir(std::string path) {
std::string delimiter = "/";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't seem portable across OSes (i.e. windows)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emkornfield , thanks, I have fixed that by using methods under arrow::filesystem

jni::parquet::ParquetReader* reader = (jni::parquet::ParquetReader*)reader_ptr;
arrow::Status status = reader->readNext(&record_batch);

if (!status.ok() || !record_batch) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a not OK status should be propagated as an exception?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emkornfield , oh, this is not an error, this was used to capture readNextBatch has reached end of rowGroups. That is why I return nullptr here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is conflating two cases here. status isn't OK. this indicates some sort of error that should be propagated (or checked specifically to be end of stream) and end of stream (signalled by a null record_batch).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I will split these two cases. Thanks

class ParquetReader {
public:
explicit ParquetReader(std::string path);
::arrow::Status initialize(std::vector<int>& column_indices,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these should either be const references or passed via pointer if a mutation is expected (and moved to the end of the function call).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emkornfield , fixed, thanks

1. c++ codes are under cpp/jni/parquet, used to doing jni bridge work.
2. java codes are under java/adapter/parquet, ParquetReader and ParquetWriter is the main interface class.

Signed-off-by: Chendi Xue <chendi.xue@intel.com>
last ci check shows a glibc installation issue, try again 2

Signed-off-by: Chendi Xue <chendi.xue@intel.com>
@xuechendi xuechendi force-pushed the wip_parquet branch 3 times, most recently from f69f7be to e9e4fdd Compare October 9, 2019 03:26
Signed-off-by: Chendi Xue <chendi.xue@intel.com>
1. Fixed name convention for fileNames, methods names and variable names.
2. Added some documentations for classes and interfaces.
3. Use arrow::filesystem to get and create directory

Signed-off-by: Chendi Xue <chendi.xue@intel.com>
@xuechendi
Copy link
Author

@emkornfield , Just passed CI checks, fixed all issues you mentioned including documentation, naming convention and OSes compatibility, for the reason of adding connector classes in Jni/parquet, also replied under your review, the main reason is I don’t want to modify too much of existing codes and there is no proper unified interface to use, and I would like to do that after. Would you mind to give another round review?

@xuechendi
Copy link
Author

@kszucs, would you review again, sorry for I mis overrode your changes.

@xuechendi
Copy link
Author

@wesm and @emkornfield , do you mind to take a look and see if anything need to change? @emkornfield suggested to move connector classes to somewhere else, my original plan is to do that after this pr merged(to keep this functioning and limit its modification to existing codes in arrow), but if which blocks this PR being merged, I can commit another PR to add hdfs under arrow/filesystem by wrapping the one under arrow/io, then I can remove connector classed in this PR and use arrow/filesystem/Filesystem class then.
Any comments?

@siddharthteotia
Copy link
Contributor

For Java side, is it more advantageous to implement reader as a JNI wrapper over C++ native reader as opposed to implementing a native first class vectorized reader in Java itself? I believe going through the JNI bridge will add some performance overhead

@xuechendi
Copy link
Author

For Java side, is it more advantageous to implement reader as a JNI wrapper over C++ native reader as opposed to implementing a native first class vectorized reader in Java itself? I believe going through the JNI bridge will add some performance overhead

@siddharthteotia , I am a little confused, are you suggesting to read parquet by using some parquet java package and convert to arrow? So to make it pure java?
For your question of overhead, there is no memory copy between native memory and JVM if that's what concerns you, in my implementation, java codes only hold a reference of the data, and when its ref_cnf turned to 0, it will call nativeRelease to free allocated memory.

@ghost
Copy link

ghost commented Oct 13, 2019

I can commit another PR to add hdfs under arrow/filesystem by wrapping the one under arrow/io, then I can remove connector classed in this PR and use arrow/filesystem/Filesystem class then.
Any comments?

That actually raises an interesting point: would it make more sense to bind the new, higher-level filesystem functionality alongside the Parquet reader/writer? Presumably, a lot of people using Parquet would want both, and it is something that parquet-mr seems to lack right now.

@emkornfield
Copy link
Contributor

@xuechendi thank you for effort here. I think what I would prefer to see this broken multiple smaller PRs for this (@wesm I don't know if you have reviewed yet to see if you have any thoughts). Roughly I think there is:

  1. Factor out common JNI code to one location so code isn't duplicated (i.e. the thread safe map, assembly/disassembly or record batches).
  2. A PR that does the core Parquet Reading from local filesystem.
  3. Code that handles dispatching to multiple filesystems. I think the the way to do this is to either a FileSystemFactory or a MetaFileSystem which would handle taking a path (maybe a URL) and delegating the appropriate filesystem implementation. @pitrou do you have any thoughts on this? @lihalite I think this is probably an easier solution than fully wrapping the filesystem. Thoughts?

It would also be nice to create a MemoryPool that can use Java's allocation facilities instead of the other way around. But at for me I don't think this is a blocker.

@pitrou
Copy link
Member

pitrou commented Oct 14, 2019

At some point we'll need a filesystem URI layer in Arrow C++, so that people don't have to instantiate filesystems explicitly.


@Override
public BufferAllocator getAllocator() {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't return null.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file actually is copied from the one under adapter/orc, the reason of returning null here is we didn't really need BufferAllocator here, memory is allocated by native codes. And we use this ReferenceManager class to do ref count, so when java side holds zero reference of this buffer, we can call close to free native memory.


@Override
public OwnershipTransferResult transferOwnership(ArrowBuf sourceBuffer, BufferAllocator targetAllocator) {
throw new UnsupportedOperationException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to come with a better approach here then UnsupportedOperationException. Isn't there a noop we've defined elsewhere?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I can check with that, thanks

/**
* ArrowBufBuilder.
*/
public class ArrowBufBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the purpose of this class? The javadoc is useless. Does it need to be package public? Is there a better name?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class is used to create java object from jni_wrapper.cc, I followed the way from java/adapater/orc.

@xuechendi
Copy link
Author

At some point we'll need a filesystem URI layer in Arrow C++, so that people don't have to instantiate filesystems explicitly.

yes, I believe so, I will wrapper hdfs codes under arrow/io and add a new one to arrow/Filesystem, then I can use FileSystem interfaces instead create my own connectors.

Copy link
Contributor

@emkornfield emkornfield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comments, we should break this PR up into smaller components so we don't continue to increase technical debt in the JNI layer.

@xuechendi
Copy link
Author

see comments, we should break this PR up into smaller components so we don't continue to increase technical debt in the JNI layer.

Hi, @emkornfield , thanks for your review, sorry for my delayed response, engaged in my regular work tasks last week. I just submitted a new PR which is to add an filesystemfactory and hadoopfilesystem under arrow/filesystem, to replace the connector classed in this PR. It would be great if you can spare some time to review the codes. #5717

And I am planning to submit another two PRs later on, one for jni/parquet wrapper c++ part and one java part. Will ping you then.

@xuechendi
Copy link
Author

see comments, we should break this PR up into smaller components so we don't continue to increase technical debt in the JNI layer.

Hi, @emkornfield , thanks for your review, sorry for my delayed response, engaged in my regular work tasks last week. I just submitted a new PR which is to add an filesystemfactory and hadoopfilesystem under arrow/filesystem, to replace the connector classed in this PR. It would be great if you can spare some time to review the codes. #5717

And I am planning to submit another two PRs later on, one for jni/parquet wrapper c++ part and one java part. Will ping you then.

Also added second PR #5719 to wrap parquet codes and provide an adapter for JNI(submit later)

@emkornfield
Copy link
Contributor

@xuechendi, if I understand correctly this PR can be closed in favor of #5719?

@xuechendi xuechendi closed this Dec 12, 2019
@xuechendi
Copy link
Author

@emkornfield , yes, closed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants