[FLINK] Support ORC filesystem sink format#12327
Open
zhanglistar wants to merge 32 commits into
Open
Conversation
b9fe77f to
4e42142
Compare
KevinyhZou
reviewed
Jun 23, 2026
647a12f to
43d0e3f
Compare
Contributor
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java:259
- verifyQ10OrcOutput() only checks Files.exists(outputDir), which would also pass if the path exists but is not a directory (e.g., a leftover file). Using Files.isDirectory makes the assertion stricter and avoids misleading failures later in Files.walk.
}
String insertQuery = sqlStatements[sqlStatements.length - 2].trim();
endInput() now calls drainOutput(this::finishTask) instead of finishTask() directly, so the GlutenMailboxOperatorHelper's reentrancy protection applies during end-of-input draining. This prevents nested drain races if a Velox callback schedules a drain concurrently. Also fixes IOException handling in ORC output dir cleanup.
Contributor
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated no new comments.
Comments suppressed due to low confidence (1)
gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java:263
- This AssertJ call doesn't assert anything because it doesn't chain an assertion (e.g. .isTrue()). As written, the test will pass even if checkJobRunningStatus() returns false, reducing coverage for the Kafka-source path.
String insertQuery = sqlStatements[sqlStatements.length - 2].trim();
Align finishTask() with GlutenSourceFunction which treats unknown states as normal termination (return) rather than failing. Log a warning and return, avoiding pipeline failures if velox4j adds new non-error states.
CI environment may not roll .inprogress files to part-* within the test window due to 1min rolling policy. Change filter from startsWith(part-) to exclude marker files (startsWith(_)), so both final and in-progress ORC data files are verified.
Comment on lines
+226
to
+247
| // Clean the ORC output directory before running q10_orc to ensure deterministic verification. | ||
| if ("q10_orc.sql".equals(queryFileName)) { | ||
| Path orcOutputDir = Paths.get("/tmp/data/output/bid_orc"); | ||
| if (Files.exists(orcOutputDir)) { | ||
| try { | ||
| try (java.util.stream.Stream<Path> files = Files.walk(orcOutputDir)) { | ||
| files | ||
| .sorted(java.util.Comparator.reverseOrder()) | ||
| .forEach( | ||
| p -> { | ||
| try { | ||
| Files.deleteIfExists(p); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException("Failed to delete " + p, e); | ||
| } | ||
| }); | ||
| } | ||
| } catch (IOException e) { | ||
| throw new RuntimeException("Failed to clean ORC output directory", e); | ||
| } | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes are proposed in this pull request?
Gluten Flink support ORC flilesystem sink format, solves #12203.
Depends on bigo-sg/velox4j#43 and bigo-sg/velox#52.
How was this patch tested?
UT
Was this patch authored or co-authored using generative AI tooling?