From 03eaaee052a5e02dd22e9dcf335be27b0fdf5cb7 Mon Sep 17 00:00:00 2001 From: shammonFY Date: Mon, 13 Feb 2023 17:00:35 +0800 Subject: [PATCH 1/2] [FLINK-31032] Supports AND push down for orc format --- .../store/format/orc/filter/OrcFilters.java | 23 +++++++++++++++++++ .../filter/OrcPredicateFunctionVisitor.java | 11 ++++++++- .../format/orc/OrcFilterConverterTest.java | 19 +++++++++++++++ 3 files changed, 52 insertions(+), 1 deletion(-) diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFilters.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFilters.java index 236ce65d3edd..73ac1856941e 100644 --- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFilters.java +++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFilters.java @@ -344,4 +344,27 @@ public String toString() { return "OR(" + Arrays.toString(preds) + ")"; } } + + /** An AND predicate that can be evaluated by the OrcInputFormat. */ + public static class And extends Predicate { + private final Predicate[] preds; + + public And(Predicate... predicates) { + this.preds = predicates; + } + + @Override + public SearchArgument.Builder add(SearchArgument.Builder builder) { + SearchArgument.Builder withAnd = builder.startAnd(); + for (Predicate pred : preds) { + withAnd = pred.add(withAnd); + } + return withAnd.end(); + } + + @Override + public String toString() { + return "AND(" + Arrays.toString(preds) + ")"; + } + } } diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcPredicateFunctionVisitor.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcPredicateFunctionVisitor.java index 539969aa90c2..062e807c39ee 100644 --- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcPredicateFunctionVisitor.java +++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcPredicateFunctionVisitor.java @@ -124,7 +124,16 @@ public Optional visitNotIn(FieldRef fieldRef, List @Override public Optional visitAnd(List> children) { - return Optional.empty(); + if (children.size() != 2) { + throw new RuntimeException("Illegal and children: " + children.size()); + } + + Optional c1 = children.get(0); + if (!c1.isPresent()) { + return Optional.empty(); + } + Optional c2 = children.get(1); + return c2.map(value -> new OrcFilters.And(c1.get(), value)); } @Override diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java index 702847a218a1..95af12955bcc 100644 --- a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java +++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java @@ -73,6 +73,25 @@ public void testApplyPredicate() { new OrcFilters.Equals("long1", PredicateLeaf.Type.LONG, 1), new OrcFilters.Equals("long1", PredicateLeaf.Type.LONG, 2)), new OrcFilters.Equals("long1", PredicateLeaf.Type.LONG, 3))); + + test( + builder.between(0, 1L, 3L), + new OrcFilters.And( + new OrcFilters.Not( + new OrcFilters.LessThan("long1", PredicateLeaf.Type.LONG, 1)), + new OrcFilters.LessThanEquals("long1", PredicateLeaf.Type.LONG, 3))); + + test( + builder.notIn(0, Arrays.asList(1L, 2L, 3L)), + new OrcFilters.And( + new OrcFilters.And( + new OrcFilters.Not( + new OrcFilters.Equals("long1", PredicateLeaf.Type.LONG, 1)), + new OrcFilters.Not( + new OrcFilters.Equals( + "long1", PredicateLeaf.Type.LONG, 2))), + new OrcFilters.Not( + new OrcFilters.Equals("long1", PredicateLeaf.Type.LONG, 3)))); } private void test(Predicate predicate, OrcFilters.Predicate orcPredicate) { From 16e6077b8b2d1c5b4d7586d9d4e3b877cba6c4be Mon Sep 17 00:00:00 2001 From: shammonFY Date: Tue, 14 Feb 2023 08:10:26 +0800 Subject: [PATCH 2/2] [FLINK-31032] Add docs --- .../flink/table/store/format/orc/filter/OrcFilters.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFilters.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFilters.java index 73ac1856941e..ce18540bc5c9 100644 --- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFilters.java +++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFilters.java @@ -349,6 +349,11 @@ public String toString() { public static class And extends Predicate { private final Predicate[] preds; + /** + * Creates an AND predicate. + * + * @param predicates The disjunctive predicates. + */ public And(Predicate... predicates) { this.preds = predicates; }