Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-34223: [Java] Java Substrait Consumer JNI call to ACERO C++ #34227

Merged
merged 53 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
a0aac46
feat: consume Substrait Plan
davisusanibar Mar 9, 2023
0d91f09
fix: solving maven-dependency-plugin
davisusanibar Mar 13, 2023
0599dc2
feat: add support for execution of Substrait binary plans also
davisusanibar Mar 15, 2023
c794ae5
Upgrade to Java 11 to be able to consume Isthmus library
davisusanibar Mar 15, 2023
8cc5443
fix: profile to Java test with JDK11 (be able to consume Isthmus libr…
davisusanibar Mar 15, 2023
e5594f8
fix: solve error to call Isthmus by Dataset that use JDK8
davisusanibar Mar 15, 2023
223ddef
fix: detected both log4j-over-slf4j.jar AND bound slf4j-reload4j.jar …
davisusanibar Mar 16, 2023
795e619
fix: rollback changes on orc
davisusanibar Mar 16, 2023
3bd18f1
Merge branch 'main' into poc-substrait
davisusanibar Mar 16, 2023
088a101
fix: able to compile main source with jdk8 and test with jdk11
davisusanibar Mar 16, 2023
ba23e44
fix: able to compile main source with jdk8 and test with jdk11
davisusanibar Mar 16, 2023
8655815
fix: JAVA_HOME_11_X64: command not found
davisusanibar Mar 16, 2023
d22d6b1
fix: partial comments fix
davisusanibar Mar 19, 2023
f0d8a25
Update java/dataset/src/main/cpp/jni_util.h
davisusanibar Mar 20, 2023
632f90d
Update java/dataset/src/main/java/org/apache/arrow/dataset/substrait/…
davisusanibar Mar 20, 2023
9437f4e
fix: comments
davisusanibar Mar 21, 2023
61d6ee7
fix: comments
davisusanibar Mar 21, 2023
64c7607
fix: comments
davisusanibar Mar 22, 2023
721fe01
fix: hash boost_1_81_0 does not match expected value
davisusanibar Mar 22, 2023
b3c2e1e
fix: maven-shade-plugin:jar:3.1.1 -> org.ow2.asm:asm:jar:6.0: Failed …
davisusanibar Mar 22, 2023
f5596c9
Merge branch 'main' into poc-substrait
davisusanibar Mar 22, 2023
388446b
Merge branch 'main' into poc-substrait
davisusanibar Mar 28, 2023
ead80a8
fix: clean unit test, fix comments
davisusanibar Mar 28, 2023
0446453
fix: clean substrait method to get plan
davisusanibar Mar 28, 2023
8c57c16
fix: clean sout
davisusanibar Mar 28, 2023
766b383
fix: rollback maven-shade-plugin
davisusanibar Mar 28, 2023
5e8b887
fix: failures test
davisusanibar Mar 29, 2023
7f59fbd
fix: delete methods not needed, create files of substrait plan
davisusanibar Mar 30, 2023
0d2bcf8
fix: npe read resources
davisusanibar Mar 30, 2023
4380932
fix: add resources files for nosuchfile error
davisusanibar Mar 30, 2023
9bbe4fb
fix: add resources files for nosuchfile error
davisusanibar Mar 30, 2023
5351ee1
fix: update rst documentation
davisusanibar Mar 30, 2023
e966d32
Apply suggestions from code review
davisusanibar Mar 31, 2023
cfe4061
fix: code review
davisusanibar Mar 31, 2023
2419896
Merge branch 'main' into poc-substrait
davisusanibar Apr 2, 2023
8811bc6
Merge branch 'main' into poc-substrait
davisusanibar Apr 2, 2023
ead4784
fix: rebase and changes to consider new arrow acero
davisusanibar Apr 3, 2023
9bfa15c
fix: solving PR comments
davisusanibar Apr 6, 2023
8a0eae6
Merge branch 'main' into poc-substrait
davisusanibar Apr 6, 2023
87e75eb
fix: solving PR comments
davisusanibar Apr 6, 2023
812921f
Merge branch 'main' into poc-substrait
davisusanibar Apr 10, 2023
89060eb
fix: rebase
davisusanibar Apr 10, 2023
33c634f
Update java/dataset/src/main/java/org/apache/arrow/dataset/substrait/…
davisusanibar Apr 11, 2023
34979a5
fix: comment on code review
davisusanibar Apr 11, 2023
1a6f0e5
fix: comment on code review
davisusanibar Apr 11, 2023
e388be5
fix: validate input on arrow Table associated with a given table name
davisusanibar Apr 12, 2023
8eb3e40
fix: code review
davisusanibar Apr 13, 2023
ce7800b
Merge branch 'main' into poc-substrait
davisusanibar May 12, 2023
fdd042b
Merge branch 'main' into poc-substrait
davisusanibar May 22, 2023
3dddea0
fix: solve code review comments
davisusanibar May 23, 2023
6bdae18
fix: solve code review comments
davisusanibar May 23, 2023
2d9fc84
fix: solve code review comments
davisusanibar May 24, 2023
9b5f0cb
fix: solve code review comments
davisusanibar May 24, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/scripts/java_jni_macos_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ cmake \
-DARROW_BUILD_TESTS=${ARROW_BUILD_TESTS} \
-DARROW_CSV=${ARROW_DATASET} \
-DARROW_DATASET=${ARROW_DATASET} \
-DARROW_SUBSTRAIT=${ARROW_DATASET} \
-DARROW_DEPENDENCY_USE_SHARED=OFF \
-DARROW_GANDIVA=${ARROW_GANDIVA} \
-DARROW_GANDIVA_STATIC_LIBSTDCPP=ON \
Expand Down
1 change: 1 addition & 0 deletions ci/scripts/java_jni_manylinux_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ cmake \
-DARROW_BUILD_TESTS=ON \
-DARROW_CSV=${ARROW_DATASET} \
-DARROW_DATASET=${ARROW_DATASET} \
-DARROW_SUBSTRAIT=${ARROW_DATASET} \
-DARROW_DEPENDENCY_SOURCE="VCPKG" \
-DARROW_DEPENDENCY_USE_SHARED=OFF \
-DARROW_GANDIVA_PC_CXX_FLAGS=${GANDIVA_CXX_FLAGS} \
Expand Down
1 change: 1 addition & 0 deletions ci/scripts/java_jni_windows_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ cmake \
-DARROW_BUILD_TESTS=ON \
-DARROW_CSV=${ARROW_DATASET} \
-DARROW_DATASET=${ARROW_DATASET} \
-DARROW_SUBSTRAIT=${ARROW_DATASET} \
-DARROW_DEPENDENCY_USE_SHARED=OFF \
-DARROW_ORC=${ARROW_ORC} \
-DARROW_PARQUET=${ARROW_PARQUET} \
Expand Down
1 change: 1 addition & 0 deletions docs/source/java/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ on the Arrow format and other language bindings see the :doc:`parent documentati
flight_sql
flight_sql_jdbc_driver
dataset
substrait
cdata
jdbc
Reference (javadoc) <reference/index>
120 changes: 120 additions & 0 deletions docs/source/java/substrait.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
.. Licensed to the Apache Software Foundation (ASF) under one
.. or more contributor license agreements. See the NOTICE file
.. distributed with this work for additional information
.. regarding copyright ownership. The ASF licenses this file
.. to you under the Apache License, Version 2.0 (the
.. "License"); you may not use this file except in compliance
.. with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
.. software distributed under the License is distributed on an
.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
.. KIND, either express or implied. See the License for the
.. specific language governing permissions and limitations
.. under the License.

=========
Substrait
=========

Java Substrait offer capabilities to ``Query`` data received as a Susbtrait
Plan (`plain or binary format`).

During this process, Substrait plans are read, executed, and ArrowReaders are
returned for reading Schema and ArrowRecordBatches. For Substrait plan that contains
``Local Files`` the URI per table are defined in the Substrait plan, different
than ``Named Tables`` where is needed to define a mapping name of tables
and theirs ArrowReader representation.
davisusanibar marked this conversation as resolved.
Show resolved Hide resolved

.. contents::

Getting Started
===============

Java Substrait API uses Acero C++ Substrait API capabilities thru JNI wrappers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section is not useful. Replace it with installation instructions or remove it.


.. seealso:: :doc:`../cpp/streaming_execution` for more information on Acero.

Substrait Consumer
==================
davisusanibar marked this conversation as resolved.
Show resolved Hide resolved

Substrait Plan offer two ways to define URI for Query data:

- Local Files: A fixed URI value on the plan
- Named Table: An external configuration to define URI value
davisusanibar marked this conversation as resolved.
Show resolved Hide resolved

Local Files:

.. code-block:: json

"local_files": {
"items": [
{
"uri_file": "file:///tmp/opt/lineitem.parquet",
"parquet": {}
}
]
}

Named Table:

.. code-block:: json

"namedTable": {
"names": ["LINEITEM"]
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to be a Substrait tutorial?


Here is an example of a Java program that queries a Parquet file using Java Substrait:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarify that this example uses a third party library to compile a SQL query to a Substrait plan.


.. code-block:: Java

// Query: SELECT * from nation
String uri = "file:///data/tpch_parquet/nation.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
DatasetFactory datasetFactory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uri);
Dataset dataset = datasetFactory.finish();
Scanner scanner = dataset.newScan(options);
ArrowReader reader = scanner.scanBatches()
) {
// map table to reader
Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
mapTableToArrowReader.put("NATION", reader);
// get binary plan
String sql = "SELECT * from nation";
String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, N_NAME CHAR(25), " +
"N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
Plan plan = getPlan(sql, ImmutableList.of(nation));
ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.toByteArray().length);
substraitPlan.put(plan.toByteArray());
// run query
try (ArrowReader arrowReader = new SubstraitAceroConsumer(rootAllocator()).runQuery(
substraitPlan,
mapTableToArrowReader
)) {
while (arrowReader.loadNextBatch()) {
assertEquals(arrowReader.getVectorSchemaRoot().getRowCount(), 25);
assertTrue(arrowReader.getVectorSchemaRoot().contentToTSVString().contains("MOROCCO"));
}
}
} catch (Exception e) {
e.printStackTrace();
}

.. code-block:: text

// Results example:
FieldPath(0) FieldPath(1) FieldPath(2) FieldPath(3)
0 ALGERIA 0 haggle. carefully final deposits detect slyly agai
1 ARGENTINA 1 al foxes promise slyly according to the regular accounts. bold requests alon

Substrait Producer
==================

The following options are available for producing Substrait Plans: Acero,
Isthmus, Ibis, DuckDB, others.

You can generate Substrait plans and then send them to Java Substrait for consumption.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is useful.

9 changes: 7 additions & 2 deletions java/dataset/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

find_package(ArrowDataset REQUIRED)
find_package(ArrowSubstrait REQUIRED)

include_directories(${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR}
${JNI_INCLUDE_DIRS} ${JNI_HEADERS_DIR})
Expand All @@ -26,14 +27,18 @@ add_jar(arrow_java_jni_dataset_jar
src/main/java/org/apache/arrow/dataset/file/JniWrapper.java
src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java
src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java
src/main/java/org/apache/arrow/dataset/substrait/JniWrapper.java
GENERATE_NATIVE_HEADERS
arrow_java_jni_dataset_headers)

add_library(arrow_java_jni_dataset SHARED src/main/cpp/jni_wrapper.cc
src/main/cpp/jni_util.cc)
set_property(TARGET arrow_java_jni_dataset PROPERTY OUTPUT_NAME "arrow_dataset_jni")
target_link_libraries(arrow_java_jni_dataset arrow_java_jni_dataset_headers jni
ArrowDataset::arrow_dataset_static)
target_link_libraries(arrow_java_jni_dataset
arrow_java_jni_dataset_headers
jni
ArrowDataset::arrow_dataset_static
ArrowSubstrait::arrow_substrait_static)

if(BUILD_TESTING)
add_executable(arrow-java-jni-dataset-test src/main/cpp/jni_util_test.cc
Expand Down
122 changes: 122 additions & 0 deletions java/dataset/src/main/cpp/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include <mutex>
#include <unordered_map>

#include "arrow/array.h"
#include "arrow/array/concatenate.h"
Expand All @@ -24,12 +25,14 @@
#include "arrow/dataset/api.h"
#include "arrow/dataset/file_base.h"
#include "arrow/filesystem/localfs.h"
#include "arrow/engine/substrait/util.h"
#include "arrow/ipc/api.h"
#include "arrow/util/iterator.h"
#include "jni_util.h"
#include "org_apache_arrow_dataset_file_JniWrapper.h"
#include "org_apache_arrow_dataset_jni_JniWrapper.h"
#include "org_apache_arrow_dataset_jni_NativeMemoryPool.h"
#include "org_apache_arrow_dataset_substrait_JniWrapper.h"

namespace {

Expand Down Expand Up @@ -261,6 +264,26 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
default_memory_pool_id = -1L;
}

/// Iterate over an object array of Tables Name on position `i` and theirs
/// Memory Address representation on `i+1` position linearly.
/// Return a mapping of the Table Name to Query as a Key and a RecordBatchReader
/// as a Value.
davisusanibar marked this conversation as resolved.
Show resolved Hide resolved
std::unordered_map<std::string, std::shared_ptr<arrow::Table>> ToMapTableToArrowReader(JNIEnv* env, jobjectArray& str_array) {
davisusanibar marked this conversation as resolved.
Show resolved Hide resolved
std::unordered_map<std::string, std::shared_ptr<arrow::Table>> map_table_to_record_batch_reader;
int length = env->GetArrayLength(str_array);
std::shared_ptr<arrow::Table> output_table;
for (int pos = 0; pos < length; pos++) {
lidavidm marked this conversation as resolved.
Show resolved Hide resolved
auto j_string_key = reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array, pos));
pos++;
auto j_string_value = reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array, pos));
auto* arrow_stream_in = reinterpret_cast<ArrowArrayStream*>(std::stol(JStringToCString(env, j_string_value)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should catch errors from stol.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is internal evaluation, not exposed to the user to manipulate or setup that. Instead of that if needed try/catch this errors from stol?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should catch possible errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

std::shared_ptr<arrow::RecordBatchReader> readerIn = JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream_in));
output_table = JniGetOrThrow(readerIn->ToTable());
map_table_to_record_batch_reader[JStringToCString(env, j_string_key)] = output_table;
}
return map_table_to_record_batch_reader;
}

/*
* Class: org_apache_arrow_dataset_jni_NativeMemoryPool
* Method: getDefaultMemoryPool
Expand Down Expand Up @@ -578,3 +601,102 @@ Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, scanner));
JNI_METHOD_END()
}

/*
* Class: org_apache_arrow_dataset_substrait_JniWrapper
* Method: executeSerializedPlanLocalFiles
* Signature: (Ljava/lang/String;J)V
*/
JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles__Ljava_lang_String_2J (
JNIEnv* env, jobject, jstring plan, jlong c_arrow_array_stream_address_out) {
JNI_METHOD_START
auto* arrow_stream = reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out);
std::shared_ptr<arrow::Buffer> buffer = JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, plan)));
std::shared_ptr<arrow::RecordBatchReader> reader = JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer));
JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream));
JNI_METHOD_END()
}

/*
* Class: org_apache_arrow_dataset_substrait_JniWrapper
* Method: executeSerializedPlanLocalFiles
* Signature: (Ljava/nio/ByteBuffer;J)V
*/
JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles__Ljava_nio_ByteBuffer_2J (
JNIEnv* env, jobject, jobject plan, jlong c_arrow_array_stream_address_out) {
JNI_METHOD_START
auto* arrow_stream = reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out);
// mapping arrow::Buffer
auto *buff = reinterpret_cast<jbyte*>(env->GetDirectBufferAddress(plan));
int length = env->GetDirectBufferCapacity(plan);
std::shared_ptr<arrow::Buffer> buffer = JniGetOrThrow(arrow::AllocateBuffer(length));
std::memcpy(buffer->mutable_data(), buff, length);
// execute plan
std::shared_ptr<arrow::RecordBatchReader> reader = JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer));
JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream));
JNI_METHOD_END()
}

/*
* Class: org_apache_arrow_dataset_substrait_JniWrapper
* Method: executeSerializedPlanNamedTables
* Signature: (Ljava/lang/String;[Ljava/lang/String;J)V
*/
JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_String_2_3Ljava_lang_String_2J (
JNIEnv* env, jobject, jstring plan, jobjectArray table_to_memory_address_input, jlong memory_address_output) {
JNI_METHOD_START
// get mapping of table name to memory address
std::unordered_map<std::string, std::shared_ptr<arrow::Table>> map_table_to_reader = ToMapTableToArrowReader(env, table_to_memory_address_input);
// create table provider
arrow::engine::NamedTableProvider table_provider = [&map_table_to_reader](const std::vector<std::string>& names, const arrow::Schema&) {
std::shared_ptr<arrow::Table> output_table;
for (const auto& name : names) {
output_table = map_table_to_reader[name];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check if the table exists.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also needs to be unit tested.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this loop break if there's more than one table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop iterate over the different tables defined on the Substrait plan, then for any Table that appear on the plan tray to assign the Table Data mapping by the user.

Yes, this could cause an error, if the user map the same Table Name to different Table Data then only one of the will be used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check if the table exists.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, what I mean is that if we have more than one entry in names, we're just overwriting the same output_table multiple times. That seems wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, both pieces of code are wrong. The names list is a namespaced name. We should reject any name that is not of length 1, and only use the singular element.

We still need to check if the table exists in the first place.

}
std::shared_ptr<arrow::compute::ExecNodeOptions> options =
std::make_shared<arrow::compute::TableSourceNodeOptions>(std::move(output_table));
return arrow::compute::Declaration("table_source", {}, options, "java_source");
};
arrow::engine::ConversionOptions conversion_options;
conversion_options.named_table_provider = std::move(table_provider);
// execute plan
std::shared_ptr<arrow::Buffer> buffer = JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, plan)));
std::shared_ptr<arrow::RecordBatchReader> readerOut = JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer, nullptr, nullptr, conversion_options));
auto* arrow_stream_out = reinterpret_cast<ArrowArrayStream*>(memory_address_output);
JniAssertOkOrThrow(arrow::ExportRecordBatchReader(readerOut, arrow_stream_out));
JNI_METHOD_END()
}

/*
* Class: org_apache_arrow_dataset_substrait_JniWrapper
* Method: executeSerializedPlanNamedTables
* Signature: (Ljava/nio/ByteBuffer;[Ljava/lang/String;J)V
*/
JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_nio_ByteBuffer_2_3Ljava_lang_String_2J (
JNIEnv* env, jobject, jobject plan, jobjectArray table_to_memory_address_input, jlong memory_address_output) {
JNI_METHOD_START
// get mapping of table name to memory address
std::unordered_map<std::string, std::shared_ptr<arrow::Table>> map_table_to_reader = ToMapTableToArrowReader(env, table_to_memory_address_input);
// create table provider
arrow::engine::NamedTableProvider table_provider = [&map_table_to_reader](const std::vector<std::string>& names, const arrow::Schema&) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code doesn't seem to have been formatted

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I suppose we don't run the formatter on JNI code?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed manually

std::shared_ptr<arrow::Table> output_table;
for (const auto& name : names) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code is duplicated

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has not been addressed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With output_table = map_table_to_reader[name], we tried to be more concise.

Is it okay to continue as it is now, it is a lambda, so you will see more details about what is happening.

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we should not duplicate this code. Let's create a separate function instead.

output_table = map_table_to_reader[name];
}
std::shared_ptr<arrow::compute::ExecNodeOptions> options =
std::make_shared<arrow::compute::TableSourceNodeOptions>(std::move(output_table));
return arrow::compute::Declaration("table_source", {}, options, "java_source");
};
arrow::engine::ConversionOptions conversion_options;
conversion_options.named_table_provider = std::move(table_provider);
// mapping arrow::Buffer
auto *buff = reinterpret_cast<jbyte*>(env->GetDirectBufferAddress(plan));
int length = env->GetDirectBufferCapacity(plan);
std::shared_ptr<arrow::Buffer> buffer = JniGetOrThrow(arrow::AllocateBuffer(length));
std::memcpy(buffer->mutable_data(), buff, length);
// execute plan
std::shared_ptr<arrow::RecordBatchReader> readerOut = JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer, nullptr, nullptr, conversion_options));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: follow C++ conventions here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

auto* arrow_stream_out = reinterpret_cast<ArrowArrayStream*>(memory_address_output);
JniAssertOkOrThrow(arrow::ExportRecordBatchReader(readerOut, arrow_stream_out));
JNI_METHOD_END()
}