Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-16913: [Java] Implement ArrowArrayStream #13465

Merged
merged 5 commits into from
Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1748,7 +1748,9 @@ class ArrayStreamBatchReader : public RecordBatchReader {
}

~ArrayStreamBatchReader() {
ArrowArrayStreamRelease(&stream_);
if (!ArrowArrayStreamIsReleased(&stream_)) {
ArrowArrayStreamRelease(&stream_);
}
DCHECK(ArrowArrayStreamIsReleased(&stream_));
}

Expand All @@ -1766,6 +1768,13 @@ class ArrayStreamBatchReader : public RecordBatchReader {
}
}

Status Close() override {
if (!ArrowArrayStreamIsReleased(&stream_)) {
ArrowArrayStreamRelease(&stream_);
}
return Status::OK();
}

private:
std::shared_ptr<Schema> CacheSchema() const {
if (!schema_) {
Expand Down
163 changes: 145 additions & 18 deletions docs/source/python/integration/python_java.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ marshaling and unmarshaling data.

The article takes for granted that you have a ``Python`` environment
with ``pyarrow`` correctly installed and a ``Java`` environment with
``arrow`` library correctly installed.
``arrow`` library correctly installed.
The ``Arrow Java`` version must have been compiled with ``mvn -Parrow-c-data`` to
ensure CData exchange support is enabled.
See `Python Install Instructions <https://arrow.apache.org/docs/python/install.html>`_
Expand All @@ -53,7 +53,7 @@ We would save such class in the ``Simple.java`` file and proceed with
compiling it to ``Simple.class`` using ``javac Simple.java``.

Once the ``Simple.class`` file is created we can use the class
from Python using the
from Python using the
`JPype <https://jpype.readthedocs.io/>`_ library which
enables a Java runtime within the Python interpreter.

Expand All @@ -64,11 +64,11 @@ enables a Java runtime within the Python interpreter.
$ pip install jpype1

The most basic thing we can do with our ``Simple`` class is to
use the ``Simple.getNumber`` method from Python and see
use the ``Simple.getNumber`` method from Python and see
if it will return the result.

To do so, we can create a ``simple.py`` file which uses ``jpype`` to
import the ``Simple`` class from ``Simple.class`` file and invoke
import the ``Simple`` class from ``Simple.class`` file and invoke
the ``Simple.getNumber`` method:

.. code-block:: python
Expand All @@ -87,7 +87,7 @@ to access the ``Java`` method and print the expected result:

.. code-block:: console

$ python simple.py
$ python simple.py
4

Java to Python using pyarrow.jvm
Expand Down Expand Up @@ -132,7 +132,7 @@ class, named ``FillTen.java``
}

This class provides a public ``createArray`` method that anyone can invoke
to get back an array containing numbers from 1 to 10.
to get back an array containing numbers from 1 to 10.

Given that this class now has a dependency on a bunch of packages,
compiling it with ``javac`` is not enough anymore. We need to create
Expand All @@ -142,15 +142,15 @@ a dedicated ``pom.xml`` file where we can collect the dependencies:

<project>
<modelVersion>4.0.0</modelVersion>

<groupId>org.apache.arrow.py2java</groupId>
<artifactId>FillTen</artifactId>
<version>1</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</properties>

<dependencies>
<dependency>
Expand All @@ -170,7 +170,7 @@ a dedicated ``pom.xml`` file where we can collect the dependencies:
<artifactId>arrow-vector</artifactId>
<version>8.0.0</version>
<type>pom</type>
</dependency>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-c-data</artifactId>
Expand All @@ -182,22 +182,22 @@ a dedicated ``pom.xml`` file where we can collect the dependencies:

Once the ``FillTen.java`` file with the class is created
as ``src/main/java/FillTen.java`` we can use ``maven`` to
compile the project with ``mvn package`` and get it
compile the project with ``mvn package`` and get it
available in the ``target`` directory.

.. code-block:: console

$ mvn package
[INFO] Scanning for projects...
[INFO]
[INFO]
[INFO] ------------------< org.apache.arrow.py2java:FillTen >------------------
[INFO] Building FillTen 1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ FillTen ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 1 source file to /experiments/java2py/target/classes
[INFO]
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ FillTen ---
[INFO] Building jar: /experiments/java2py/target/FillTen-1.jar
[INFO] ------------------------------------------------------------------------
Expand All @@ -215,11 +215,11 @@ We can use ``maven`` to collect all dependencies and make them available in a si

$ mvn org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies -DoutputDirectory=dependencies
[INFO] Scanning for projects...
[INFO]
[INFO]
[INFO] ------------------< org.apache.arrow.py2java:FillTen >------------------
[INFO] Building FillTen 1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO]
[INFO] --- maven-dependency-plugin:2.7:copy-dependencies (default-cli) @ FillTen ---
[INFO] Copying jsr305-3.0.2.jar to /experiments/java2py/dependencies/jsr305-3.0.2.jar
[INFO] Copying netty-common-4.1.72.Final.jar to /experiments/java2py/dependencies/netty-common-4.1.72.Final.jar
Expand All @@ -246,9 +246,9 @@ We can use ``maven`` to collect all dependencies and make them available in a si
Instead of manually collecting dependencies, you could also rely on the
``maven-assembly-plugin`` to build a single ``jar`` with all dependencies.

Once our package and all its depdendencies are available,
Once our package and all its depdendencies are available,
we can invoke it from ``fillten_pyarrowjvm.py`` script that will
import the ``FillTen`` class and print out the result of invoking ``FillTen.createArray``
import the ``FillTen`` class and print out the result of invoking ``FillTen.createArray``

.. code-block:: python

Expand Down Expand Up @@ -291,7 +291,7 @@ Running the python script will lead to two lines getting printed:

The first line is the raw result of invoking the ``FillTen.createArray`` method.
The resulting object is a proxy to the actual Java object, so it's not really a pyarrow
Array, it will lack most of its capabilities and methods.
Array, it will lack most of its capabilities and methods.
That's why we subsequently use ``pyarrow.jvm.array`` to convert it to an actual
``pyarrow`` array. That allows us to treat it like any other ``pyarrow`` array.
The result is the second line in the output where the array is correctly reported
Expand Down Expand Up @@ -441,3 +441,130 @@ values printed by the Python script have been properly changed by the Java code:
9,
10
]

We can also use the C Stream Interface to exchange
:py:class:`pyarrow.RecordBatchReader`s between Java and Python. We'll
use this Java class as a demo, which lets you read an Arrow IPC file
via Java's implementation, or write data to a JSON file:

.. code-block:: java

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

import org.apache.arrow.c.ArrowArrayStream;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.JsonFileWriter;

public class PythonInteropDemo implements AutoCloseable {
private final BufferAllocator allocator;

public PythonInteropDemo() {
this.allocator = new RootAllocator();
}

public void exportStream(String path, long cStreamPointer) throws Exception {
try (final ArrowArrayStream stream = ArrowArrayStream.wrap(cStreamPointer)) {
ArrowFileReader reader = new ArrowFileReader(Files.newByteChannel(Paths.get(path)), allocator);
Data.exportArrayStream(allocator, reader, stream);
}
}

public void importStream(String path, long cStreamPointer) throws Exception {
try (final ArrowArrayStream stream = ArrowArrayStream.wrap(cStreamPointer);
final ArrowReader input = Data.importArrayStream(allocator, stream);
JsonFileWriter writer = new JsonFileWriter(new File(path))) {
writer.start(input.getVectorSchemaRoot().getSchema(), input);
while (input.loadNextBatch()) {
writer.write(input.getVectorSchemaRoot());
}
}
}

@Override
public void close() throws Exception {
allocator.close();
}
}

On the Python side, we'll use JPype as before, except this time we'll
send RecordBatchReaders back and forth:

.. code-block:: python

import tempfile

import jpype
import jpype.imports
from jpype.types import *

# Init the JVM and make demo class available to Python.
jpype.startJVM(classpath=["./dependencies/*", "./target/*"])
PythonInteropDemo = JClass("PythonInteropDemo")
demo = PythonInteropDemo()

# Create a Python record batch reader
import pyarrow as pa
schema = pa.schema([
("ints", pa.int64()),
("strs", pa.string())
])
batches = [
pa.record_batch([
[0, 2, 4, 8],
["a", "b", "c", None],
], schema=schema),
pa.record_batch([
[None, 32, 64, None],
["e", None, None, "h"],
], schema=schema),
]
reader = pa.RecordBatchReader.from_batches(schema, batches)

from pyarrow.cffi import ffi as arrow_c

# Export the Python reader through C Data
c_stream = arrow_c.new("struct ArrowArrayStream*")
c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream))
reader._export_to_c(c_stream_ptr)

# Send reader to the Java function that writes a JSON file
with tempfile.NamedTemporaryFile() as temp:
demo.importStream(temp.name, c_stream_ptr)

# Read the JSON file back
with open(temp.name) as source:
print("JSON file written by Java:")
print(source.read())


# Write an Arrow IPC file for Java to read
with tempfile.NamedTemporaryFile() as temp:
with pa.ipc.new_file(temp.name, schema) as sink:
for batch in batches:
sink.write_batch(batch)

demo.exportStream(temp.name, c_stream_ptr)
with pa.RecordBatchReader._import_from_c(c_stream_ptr) as source:
print("IPC file read by Java:")
print(source.read_all())

.. code-block:: console

$ mvn package
$ mvn org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies -DoutputDirectory=dependencies
$ python demo.py
JSON file written by Java:
{"schema":{"fields":[{"name":"ints","nullable":true,"type":{"name":"int","bitWidth":64,"isSigned":true},"children":[]},{"name":"strs","nullable":true,"type":{"name":"utf8"},"children":[]}]},"batches":[{"count":4,"columns":[{"name":"ints","count":4,"VALIDITY":[1,1,1,1],"DATA":["0","2","4","8"]},{"name":"strs","count":4,"VALIDITY":[1,1,1,0],"OFFSET":[0,1,2,3,3],"DATA":["a","b","c",""]}]},{"count":4,"columns":[{"name":"ints","count":4,"VALIDITY":[0,1,1,0],"DATA":["0","32","64","0"]},{"name":"strs","count":4,"VALIDITY":[1,0,0,1],"OFFSET":[0,1,1,1,2],"DATA":["e","","","h"]}]}]}
IPC file read by Java:
pyarrow.Table
ints: int64
strs: string
----
ints: [[0,2,4,8],[null,32,64,null]]
strs: [["a","b","c",null],["e",null,null,"h"]]
1 change: 1 addition & 0 deletions java/c/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ include_directories(${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR}
${JNI_INCLUDE_DIRS} ${JNI_HEADERS_DIR})

add_jar(${PROJECT_NAME}
src/main/java/org/apache/arrow/c/jni/CDataJniException.java
src/main/java/org/apache/arrow/c/jni/JniLoader.java
src/main/java/org/apache/arrow/c/jni/JniWrapper.java
src/main/java/org/apache/arrow/c/jni/PrivateData.java
Expand Down
5 changes: 5 additions & 0 deletions java/c/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@
<version>${dep.guava.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
Expand Down
Loading