Skip to content

Commit

Permalink
ARROW-16913: [Java] Implement ArrowArrayStream (apache#13465)
Browse files Browse the repository at this point in the history
Implements ArrowArrayStream for Java. The equivalent Java-side interface chosen is ArrowReader.

Also:
- Fixes a couple of JDK9 compatibility issues I ran into. I _think_ these will not normally affect people except during development (I think because I was mixing IntelliJ and Maven).
- Manually clang-format the C++ code. Clean up some things to match Arrow convention and remove some unused declarations.
- Extends the DictionaryProvider interface. This is a potentially breaking change; we could make the method default (and raise an exception) instead.

Authored-by: David Li <li.davidm96@gmail.com>
Signed-off-by: Alessandro Molina <amol@turbogears.org>
  • Loading branch information
lidavidm authored and drin committed Jul 5, 2022
1 parent 13486b4 commit 45ea4ed
Show file tree
Hide file tree
Showing 23 changed files with 1,356 additions and 62 deletions.
11 changes: 10 additions & 1 deletion cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1748,7 +1748,9 @@ class ArrayStreamBatchReader : public RecordBatchReader {
}

~ArrayStreamBatchReader() {
ArrowArrayStreamRelease(&stream_);
if (!ArrowArrayStreamIsReleased(&stream_)) {
ArrowArrayStreamRelease(&stream_);
}
DCHECK(ArrowArrayStreamIsReleased(&stream_));
}

Expand All @@ -1766,6 +1768,13 @@ class ArrayStreamBatchReader : public RecordBatchReader {
}
}

Status Close() override {
if (!ArrowArrayStreamIsReleased(&stream_)) {
ArrowArrayStreamRelease(&stream_);
}
return Status::OK();
}

private:
std::shared_ptr<Schema> CacheSchema() const {
if (!schema_) {
Expand Down
163 changes: 145 additions & 18 deletions docs/source/python/integration/python_java.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ marshaling and unmarshaling data.

The article takes for granted that you have a ``Python`` environment
with ``pyarrow`` correctly installed and a ``Java`` environment with
``arrow`` library correctly installed.
``arrow`` library correctly installed.
The ``Arrow Java`` version must have been compiled with ``mvn -Parrow-c-data`` to
ensure CData exchange support is enabled.
See `Python Install Instructions <https://arrow.apache.org/docs/python/install.html>`_
Expand All @@ -53,7 +53,7 @@ We would save such class in the ``Simple.java`` file and proceed with
compiling it to ``Simple.class`` using ``javac Simple.java``.

Once the ``Simple.class`` file is created we can use the class
from Python using the
from Python using the
`JPype <https://jpype.readthedocs.io/>`_ library which
enables a Java runtime within the Python interpreter.

Expand All @@ -64,11 +64,11 @@ enables a Java runtime within the Python interpreter.
$ pip install jpype1
The most basic thing we can do with our ``Simple`` class is to
use the ``Simple.getNumber`` method from Python and see
use the ``Simple.getNumber`` method from Python and see
if it will return the result.

To do so, we can create a ``simple.py`` file which uses ``jpype`` to
import the ``Simple`` class from ``Simple.class`` file and invoke
import the ``Simple`` class from ``Simple.class`` file and invoke
the ``Simple.getNumber`` method:

.. code-block:: python
Expand All @@ -87,7 +87,7 @@ to access the ``Java`` method and print the expected result:

.. code-block:: console
$ python simple.py
$ python simple.py
4
Java to Python using pyarrow.jvm
Expand Down Expand Up @@ -132,7 +132,7 @@ class, named ``FillTen.java``
}
This class provides a public ``createArray`` method that anyone can invoke
to get back an array containing numbers from 1 to 10.
to get back an array containing numbers from 1 to 10.

Given that this class now has a dependency on a bunch of packages,
compiling it with ``javac`` is not enough anymore. We need to create
Expand All @@ -142,15 +142,15 @@ a dedicated ``pom.xml`` file where we can collect the dependencies:
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.arrow.py2java</groupId>
<artifactId>FillTen</artifactId>
<version>1</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</properties>
<dependencies>
<dependency>
Expand All @@ -170,7 +170,7 @@ a dedicated ``pom.xml`` file where we can collect the dependencies:
<artifactId>arrow-vector</artifactId>
<version>8.0.0</version>
<type>pom</type>
</dependency>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-c-data</artifactId>
Expand All @@ -182,22 +182,22 @@ a dedicated ``pom.xml`` file where we can collect the dependencies:
Once the ``FillTen.java`` file with the class is created
as ``src/main/java/FillTen.java`` we can use ``maven`` to
compile the project with ``mvn package`` and get it
compile the project with ``mvn package`` and get it
available in the ``target`` directory.

.. code-block:: console
$ mvn package
[INFO] Scanning for projects...
[INFO]
[INFO]
[INFO] ------------------< org.apache.arrow.py2java:FillTen >------------------
[INFO] Building FillTen 1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ FillTen ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 1 source file to /experiments/java2py/target/classes
[INFO]
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ FillTen ---
[INFO] Building jar: /experiments/java2py/target/FillTen-1.jar
[INFO] ------------------------------------------------------------------------
Expand All @@ -215,11 +215,11 @@ We can use ``maven`` to collect all dependencies and make them available in a si
$ mvn org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies -DoutputDirectory=dependencies
[INFO] Scanning for projects...
[INFO]
[INFO]
[INFO] ------------------< org.apache.arrow.py2java:FillTen >------------------
[INFO] Building FillTen 1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO]
[INFO] --- maven-dependency-plugin:2.7:copy-dependencies (default-cli) @ FillTen ---
[INFO] Copying jsr305-3.0.2.jar to /experiments/java2py/dependencies/jsr305-3.0.2.jar
[INFO] Copying netty-common-4.1.72.Final.jar to /experiments/java2py/dependencies/netty-common-4.1.72.Final.jar
Expand All @@ -246,9 +246,9 @@ We can use ``maven`` to collect all dependencies and make them available in a si
Instead of manually collecting dependencies, you could also rely on the
``maven-assembly-plugin`` to build a single ``jar`` with all dependencies.

Once our package and all its depdendencies are available,
Once our package and all its depdendencies are available,
we can invoke it from ``fillten_pyarrowjvm.py`` script that will
import the ``FillTen`` class and print out the result of invoking ``FillTen.createArray``
import the ``FillTen`` class and print out the result of invoking ``FillTen.createArray``

.. code-block:: python
Expand Down Expand Up @@ -291,7 +291,7 @@ Running the python script will lead to two lines getting printed:
The first line is the raw result of invoking the ``FillTen.createArray`` method.
The resulting object is a proxy to the actual Java object, so it's not really a pyarrow
Array, it will lack most of its capabilities and methods.
Array, it will lack most of its capabilities and methods.
That's why we subsequently use ``pyarrow.jvm.array`` to convert it to an actual
``pyarrow`` array. That allows us to treat it like any other ``pyarrow`` array.
The result is the second line in the output where the array is correctly reported
Expand Down Expand Up @@ -441,3 +441,130 @@ values printed by the Python script have been properly changed by the Java code:
9,
10
]
We can also use the C Stream Interface to exchange
:py:class:`pyarrow.RecordBatchReader`s between Java and Python. We'll
use this Java class as a demo, which lets you read an Arrow IPC file
via Java's implementation, or write data to a JSON file:
.. code-block:: java
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.apache.arrow.c.ArrowArrayStream;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.JsonFileWriter;
public class PythonInteropDemo implements AutoCloseable {
private final BufferAllocator allocator;
public PythonInteropDemo() {
this.allocator = new RootAllocator();
}
public void exportStream(String path, long cStreamPointer) throws Exception {
try (final ArrowArrayStream stream = ArrowArrayStream.wrap(cStreamPointer)) {
ArrowFileReader reader = new ArrowFileReader(Files.newByteChannel(Paths.get(path)), allocator);
Data.exportArrayStream(allocator, reader, stream);
}
}
public void importStream(String path, long cStreamPointer) throws Exception {
try (final ArrowArrayStream stream = ArrowArrayStream.wrap(cStreamPointer);
final ArrowReader input = Data.importArrayStream(allocator, stream);
JsonFileWriter writer = new JsonFileWriter(new File(path))) {
writer.start(input.getVectorSchemaRoot().getSchema(), input);
while (input.loadNextBatch()) {
writer.write(input.getVectorSchemaRoot());
}
}
}
@Override
public void close() throws Exception {
allocator.close();
}
}
On the Python side, we'll use JPype as before, except this time we'll
send RecordBatchReaders back and forth:
.. code-block:: python
import tempfile
import jpype
import jpype.imports
from jpype.types import *
# Init the JVM and make demo class available to Python.
jpype.startJVM(classpath=["./dependencies/*", "./target/*"])
PythonInteropDemo = JClass("PythonInteropDemo")
demo = PythonInteropDemo()
# Create a Python record batch reader
import pyarrow as pa
schema = pa.schema([
("ints", pa.int64()),
("strs", pa.string())
])
batches = [
pa.record_batch([
[0, 2, 4, 8],
["a", "b", "c", None],
], schema=schema),
pa.record_batch([
[None, 32, 64, None],
["e", None, None, "h"],
], schema=schema),
]
reader = pa.RecordBatchReader.from_batches(schema, batches)
from pyarrow.cffi import ffi as arrow_c
# Export the Python reader through C Data
c_stream = arrow_c.new("struct ArrowArrayStream*")
c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream))
reader._export_to_c(c_stream_ptr)
# Send reader to the Java function that writes a JSON file
with tempfile.NamedTemporaryFile() as temp:
demo.importStream(temp.name, c_stream_ptr)
# Read the JSON file back
with open(temp.name) as source:
print("JSON file written by Java:")
print(source.read())
# Write an Arrow IPC file for Java to read
with tempfile.NamedTemporaryFile() as temp:
with pa.ipc.new_file(temp.name, schema) as sink:
for batch in batches:
sink.write_batch(batch)
demo.exportStream(temp.name, c_stream_ptr)
with pa.RecordBatchReader._import_from_c(c_stream_ptr) as source:
print("IPC file read by Java:")
print(source.read_all())
.. code-block:: console
$ mvn package
$ mvn org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies -DoutputDirectory=dependencies
$ python demo.py
JSON file written by Java:
{"schema":{"fields":[{"name":"ints","nullable":true,"type":{"name":"int","bitWidth":64,"isSigned":true},"children":[]},{"name":"strs","nullable":true,"type":{"name":"utf8"},"children":[]}]},"batches":[{"count":4,"columns":[{"name":"ints","count":4,"VALIDITY":[1,1,1,1],"DATA":["0","2","4","8"]},{"name":"strs","count":4,"VALIDITY":[1,1,1,0],"OFFSET":[0,1,2,3,3],"DATA":["a","b","c",""]}]},{"count":4,"columns":[{"name":"ints","count":4,"VALIDITY":[0,1,1,0],"DATA":["0","32","64","0"]},{"name":"strs","count":4,"VALIDITY":[1,0,0,1],"OFFSET":[0,1,1,1,2],"DATA":["e","","","h"]}]}]}
IPC file read by Java:
pyarrow.Table
ints: int64
strs: string
----
ints: [[0,2,4,8],[null,32,64,null]]
strs: [["a","b","c",null],["e",null,null,"h"]]
1 change: 1 addition & 0 deletions java/c/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ include_directories(${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR}
${JNI_INCLUDE_DIRS} ${JNI_HEADERS_DIR})

add_jar(${PROJECT_NAME}
src/main/java/org/apache/arrow/c/jni/CDataJniException.java
src/main/java/org/apache/arrow/c/jni/JniLoader.java
src/main/java/org/apache/arrow/c/jni/JniWrapper.java
src/main/java/org/apache/arrow/c/jni/PrivateData.java
Expand Down
5 changes: 5 additions & 0 deletions java/c/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@
<version>${dep.guava.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
Expand Down
Loading

0 comments on commit 45ea4ed

Please sign in to comment.