New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-34223: [Java] Java Substrait Consumer JNI call to ACERO C++ #34227
Conversation
|
@github-actions crossbow submit java-jars |
Revision: 7500dd7 Submitted crossbow builds: ursacomputing/crossbow @ actions-1555224742
|
Revision: 7500dd7 Submitted crossbow builds: ursacomputing/crossbow @ actions-c5c0135c13
|
/** | ||
* Java binding of the C++ ExecuteSerializedPlan. | ||
*/ | ||
public class SubstraitConsumer implements AutoCloseable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this should be an interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
* @param substraitPlan the JSON Substrait plan. | ||
* @return the ArrowReader to iterate for record batches. | ||
*/ | ||
public ArrowReader runQuery(String substraitPlan) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plans should be byte[] or ByteBuffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems this assumes JSON plans which is probably not what people will be working with
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please if yu could help me, What will be my options to exchange a Substrait Plan (Java byte
) to std::shared_ptr<arrow::Buffer>
called by JNI?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JNI functions can read byte arrays, you can then copy into an Arrow buffer (this is probably good anyways to avoid too many cross-boundary dependencies)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just create this ByteBuffer
with Substrait Plan
ByteBuffer directByteBuffer = ByteBuffer.allocateDirect(64);
directByteBuffer.put("DEMO_SUBSTRAIT_PLAN".getBytes(StandardCharsets.UTF_8)); // protoPlan.toByteArray();
JNI Wrapper:
Recover ByteArray with:
jbyte *buff = (jbyte *) env->GetDirectBufferAddress(plan);
How I could transform that jbyte
into std::shared_ptr<arrow::Buffer>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get the capacity, allocate a mutable buffer, memcpy the result
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
@@ -45,6 +45,7 @@ export ARROW_ORC | |||
: ${ARROW_PLASMA:=ON} | |||
export ARROW_PLASMA | |||
: ${ARROW_S3:=ON} | |||
: ${ARROW_SUBSTRAIT:=ON} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has to always match ARROW_DATASET so a separate variable won't make sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
7500dd7
to
92cb8d2
Compare
92cb8d2
to
a0aac46
Compare
@github-actions crossbow submit java-jars |
Revision: e5594f8 Submitted crossbow builds: ursacomputing/crossbow @ actions-bf821d607d
|
…on the class path
@github-actions crossbow submit java-jars |
Revision: 223ddef Submitted crossbow builds: ursacomputing/crossbow @ actions-17d27841d2
|
Could you rebase on main to use #34480? |
Revision: ce7800b Submitted crossbow builds: ursacomputing/crossbow @ actions-02cc882fff
|
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java
Outdated
Show resolved
Hide resolved
Map<String, String> metadataName = new HashMap<>(); | ||
metadataName.put("ARROW:extension:name", "varchar"); | ||
metadataName.put("ARROW:extension:metadata", "varchar{length:150}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the metadata actually necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is needed, in the case of local tables we are seeing this metadata information in the response.
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java
Outdated
Show resolved
Hide resolved
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java
Outdated
Show resolved
Hide resolved
* @param planInput the JSON Substrait plan. | ||
* @param memoryAddressOutput the memory address where RecordBatchReader is exported. | ||
*/ | ||
public native void executeSerializedPlanLocalFiles(String planInput, long memoryAddressOutput); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was never addressed?
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/AceroSubstraitConsumer.java
Outdated
Show resolved
Hide resolved
try { | ||
AutoCloseables.close(arrowArrayStream); | ||
} catch (RuntimeException e) { | ||
throw e; | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the extra catches?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deleted
try { | ||
AutoCloseables.close(arrowArrayStream); | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is declared to throw Exception already. Why are we catching and re-throwing this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we catching it in the first place? Let it propagate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, changed
@@ -124,8 +126,6 @@ private ArrowReader execute(ByteBuffer plan, Map<String, ArrowReader> namedTable | |||
} finally { | |||
try { | |||
AutoCloseables.close(arrowArrayStream); | |||
} catch (RuntimeException e) { | |||
throw e; | |||
} catch (Exception e) { | |||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
builder.replace(builder.indexOf("FILENAME_PLACEHOLDER"), | ||
builder.indexOf("FILENAME_PLACEHOLDER") + "FILENAME_PLACEHOLDER".length(), uri); | ||
return builder.toString(); | ||
return plan.replace("FILENAME_PLACEHOLDER", uri); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again: why is this a whole method? Just inline it; it's only used once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The last comments are still unaddressed.
Benchmark runs are scheduled for baseline = c4ea194 and contender = 95c33d8. 95c33d8 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
['Python', 'R'] benchmarks have high level of regressions. |
…ilter as a Substrait proto extended expression (#35570) ### Rationale for this change To close #34252 ### What changes are included in this PR? This is a proposal to try to solve: 1. Receive a list of Substrait scalar expressions and use them to Project a Dataset - [x] Draft a Substrait Extended Expression to test (this will be generated by 3rd party project such as Isthmus) - [x] Use C++ draft PR to Serialize/Deserialize Extended Expression proto messages - [x] Create JNI Wrapper for ScannerBuilder::Project - [x] Create JNI API - [x] Testing coverage - [x] Documentation Current problem is: `java.lang.RuntimeException: Inferring column projection from FieldRef FieldRef.FieldPath(0)`. Not able to infer by column position by able to infer by colum name. This problem is solved by #35798 This PR needs/use this PRs/Issues: - #34834 - #34227 - #35579 2. Receive a Boolean-valued Substrait scalar expression and use it to filter a Dataset - [x] Working to identify activities ### Are these changes tested? Initial unit test added. ### Are there any user-facing changes? No * Closes: #34252 Lead-authored-by: david dali susanibar arce <davi.sarces@gmail.com> Co-authored-by: Weston Pace <weston.pace@gmail.com> Co-authored-by: benibus <bpharks@gmx.com> Co-authored-by: David Li <li.davidm96@gmail.com> Co-authored-by: Dane Pitkin <48041712+danepitkin@users.noreply.github.com> Signed-off-by: David Li <li.davidm96@gmail.com>
…der::Filter as a Substrait proto extended expression (apache#35570) ### Rationale for this change To close apache#34252 ### What changes are included in this PR? This is a proposal to try to solve: 1. Receive a list of Substrait scalar expressions and use them to Project a Dataset - [x] Draft a Substrait Extended Expression to test (this will be generated by 3rd party project such as Isthmus) - [x] Use C++ draft PR to Serialize/Deserialize Extended Expression proto messages - [x] Create JNI Wrapper for ScannerBuilder::Project - [x] Create JNI API - [x] Testing coverage - [x] Documentation Current problem is: `java.lang.RuntimeException: Inferring column projection from FieldRef FieldRef.FieldPath(0)`. Not able to infer by column position by able to infer by colum name. This problem is solved by apache#35798 This PR needs/use this PRs/Issues: - apache#34834 - apache#34227 - apache#35579 2. Receive a Boolean-valued Substrait scalar expression and use it to filter a Dataset - [x] Working to identify activities ### Are these changes tested? Initial unit test added. ### Are there any user-facing changes? No * Closes: apache#34252 Lead-authored-by: david dali susanibar arce <davi.sarces@gmail.com> Co-authored-by: Weston Pace <weston.pace@gmail.com> Co-authored-by: benibus <bpharks@gmx.com> Co-authored-by: David Li <li.davidm96@gmail.com> Co-authored-by: Dane Pitkin <48041712+danepitkin@users.noreply.github.com> Signed-off-by: David Li <li.davidm96@gmail.com>
…der::Filter as a Substrait proto extended expression (apache#35570) ### Rationale for this change To close apache#34252 ### What changes are included in this PR? This is a proposal to try to solve: 1. Receive a list of Substrait scalar expressions and use them to Project a Dataset - [x] Draft a Substrait Extended Expression to test (this will be generated by 3rd party project such as Isthmus) - [x] Use C++ draft PR to Serialize/Deserialize Extended Expression proto messages - [x] Create JNI Wrapper for ScannerBuilder::Project - [x] Create JNI API - [x] Testing coverage - [x] Documentation Current problem is: `java.lang.RuntimeException: Inferring column projection from FieldRef FieldRef.FieldPath(0)`. Not able to infer by column position by able to infer by colum name. This problem is solved by apache#35798 This PR needs/use this PRs/Issues: - apache#34834 - apache#34227 - apache#35579 2. Receive a Boolean-valued Substrait scalar expression and use it to filter a Dataset - [x] Working to identify activities ### Are these changes tested? Initial unit test added. ### Are there any user-facing changes? No * Closes: apache#34252 Lead-authored-by: david dali susanibar arce <davi.sarces@gmail.com> Co-authored-by: Weston Pace <weston.pace@gmail.com> Co-authored-by: benibus <bpharks@gmx.com> Co-authored-by: David Li <li.davidm96@gmail.com> Co-authored-by: Dane Pitkin <48041712+danepitkin@users.noreply.github.com> Signed-off-by: David Li <li.davidm96@gmail.com>
The purpose of this PR is to implement: