From fad3473e2c379c63879ff3b8d43b90b437b6bd54 Mon Sep 17 00:00:00 2001 From: Asif Hussain Shahid Date: Tue, 26 May 2026 12:35:37 -0700 Subject: [PATCH 1/4] Bug-16563: Imposed an ordering on the runtime and data filter expressions while checking for equality and hashCode so that structurally same scans do not mismatch. This is critical for re-use of exchange to happen , where the pushdown of filters may differ in order, due to spark code using Set type collection for collecting filters for pushdown, resulting in unpredictable ordering of filters when pushed --- .../source/SparkRuntimeFilterableScan.java | 7 ++- .../iceberg/spark/source/SparkScan.java | 58 ++++++++++++++++++- .../spark/source/TestFilteredScan.java | 49 ++++++++++++++++ 3 files changed, 112 insertions(+), 2 deletions(-) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRuntimeFilterableScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRuntimeFilterableScan.java index dfd38a154811..7ab3b260d9ae 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRuntimeFilterableScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRuntimeFilterableScan.java @@ -39,6 +39,7 @@ import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.ExpressionUtil; +import org.apache.iceberg.expressions.ExpressionVisitors; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.Projections; import org.apache.iceberg.metrics.ScanReport; @@ -195,6 +196,10 @@ private Expression convertRuntimePredicates(Predicate[] predicates) { } protected String runtimeFiltersDesc() { - return Spark3Util.describe(runtimeFilters); + return runtimeFilters.stream() + .flatMap(x -> ExpressionVisitors.visit(x, ExpressionFlattener.INSTANCE).stream()) + .map(Spark3Util::describe) + .sorted() + .collect(Collectors.joining(", ")); } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index ee61523d8028..503a65701b86 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.source; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -32,8 +33,11 @@ import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; +import org.apache.iceberg.expressions.BoundPredicate; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ExpressionVisitors; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.UnboundPredicate; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.metrics.ScanReport; import org.apache.iceberg.relocated.com.google.common.base.Strings; @@ -161,7 +165,11 @@ protected Expression filter() { } protected String filtersDesc() { - return Spark3Util.describe(filters); + return filters.stream() + .flatMap(x -> ExpressionVisitors.visit(x, ExpressionFlattener.INSTANCE).stream()) + .map(Spark3Util::describe) + .sorted() + .collect(Collectors.joining(", ")); } protected Types.StructType groupingKeyType() { @@ -384,4 +392,52 @@ protected long adjustSplitSize(List tasks, long splitSize) { return splitSize; } } + + static class ExpressionFlattener extends ExpressionVisitors.ExpressionVisitor> { + + protected static final ExpressionFlattener INSTANCE = new ExpressionFlattener(); + + private ExpressionFlattener() {} + + @Override + public List alwaysTrue() { + return List.of(Expressions.alwaysTrue()); + } + + @Override + public List alwaysFalse() { + return List.of(Expressions.alwaysFalse()); + } + + @Override + public List not(List result) { + return List.of(Expressions.not(mergeExpressions(result))); + } + + @Override + public List and(List leftResult, List rightResult) { + List flattened = new ArrayList<>(leftResult); + flattened.addAll(rightResult); + return flattened; + } + + @Override + public List or(List leftResult, List rightResult) { + return List.of(Expressions.or(mergeExpressions(leftResult), mergeExpressions(rightResult))); + } + + @Override + public List predicate(BoundPredicate pred) { + return List.of(pred); + } + + @Override + public List predicate(UnboundPredicate pred) { + return List.of(pred); + } + + private Expression mergeExpressions(List toMerge) { + return toMerge.stream().reduce(Expressions.alwaysTrue(), Expressions::and); + } + } } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java index 24fecf4eb2ca..b8ae001a375c 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java @@ -65,12 +65,14 @@ import org.apache.spark.sql.connector.expressions.filter.Predicate; import org.apache.spark.sql.connector.read.Batch; import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.Scan; import org.apache.spark.sql.connector.read.ScanBuilder; import org.apache.spark.sql.connector.read.SupportsPushDownV2Filters; import org.apache.spark.sql.sources.And; import org.apache.spark.sql.sources.EqualTo; import org.apache.spark.sql.sources.Filter; import org.apache.spark.sql.sources.GreaterThan; +import org.apache.spark.sql.sources.In; import org.apache.spark.sql.sources.LessThan; import org.apache.spark.sql.sources.Not; import org.apache.spark.sql.sources.StringStartsWith; @@ -269,6 +271,53 @@ public void testUnpartitionedTimestampFilter() { "ts < cast('2017-12-22 00:00:00+00:00' as timestamp)")); } + @TestTemplate + public void testSparkBatchScanEquality_Bug16563() { + CaseInsensitiveStringMap options = + new CaseInsensitiveStringMap(ImmutableMap.of("path", unpartitioned.toString())); + + // set spark.sql.caseSensitive to false + String caseSensitivityBeforeTest = TestFilteredScan.spark.conf().get("spark.sql.caseSensitive"); + TestFilteredScan.spark.conf().set("spark.sql.caseSensitive", "false"); + try { + SparkScanBuilder builder1 = + new SparkScanBuilder(spark, TABLES.load(options.get("path")), options); + Filter f1 = GreaterThan.apply("ID", 10); + Filter f2 = LessThan.apply("data", "abc"); + + pushFilters(builder1, f1, f2); + + SparkScanBuilder builder2 = + new SparkScanBuilder(spark, TABLES.load(options.get("path")), options); + pushFilters(builder2, f2, f1); + + Scan scan1 = builder1.build(); + Scan scan2 = builder2.build(); + + assertThat(scan1).as("is be equal to ").isEqualTo(scan2); + + Filter runtimeFilter1 = In.apply("ID", new Object[] {10, 12, 13}); + Filter runtimeFilter2 = In.apply("data", new Object[] {"abc", "def", "cde"}); + + ((SparkRuntimeFilterableScan) scan1) + .filter( + Arrays.stream(new Filter[] {runtimeFilter1, runtimeFilter2}) + .map(Filter::toV2) + .toArray(Predicate[]::new)); + + ((SparkRuntimeFilterableScan) scan2) + .filter( + Arrays.stream(new Filter[] {runtimeFilter2, runtimeFilter1}) + .map(Filter::toV2) + .toArray(Predicate[]::new)); + + assertThat(scan1).as("is be equal to ").isEqualTo(scan2); + } finally { + // return global conf to previous state + TestFilteredScan.spark.conf().set("spark.sql.caseSensitive", caseSensitivityBeforeTest); + } + } + @TestTemplate public void limitPushedDownToSparkScan() { assumeThat(fileFormat) From 6e5d3d40c3f45765e96c64084e167f6f42581ad8 Mon Sep 17 00:00:00 2001 From: Asif Hussain Shahid Date: Tue, 26 May 2026 13:13:22 -0700 Subject: [PATCH 2/4] Bug-16563: Fixed style failure --- .../main/java/org/apache/iceberg/spark/source/SparkScan.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 503a65701b86..cba57da1f114 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.spark.source; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -416,7 +415,7 @@ public List not(List result) { @Override public List and(List leftResult, List rightResult) { - List flattened = new ArrayList<>(leftResult); + List flattened = Lists.newArrayList(leftResult); flattened.addAll(rightResult); return flattened; } From 2a9659f0f0d6bef857eb6083fe68162c790ed43a Mon Sep 17 00:00:00 2001 From: Asif Hussain Shahid Date: Tue, 26 May 2026 17:56:42 -0700 Subject: [PATCH 3/4] Bug-16563: Fixed test failures occuring due to filter description string reordering causing assertion failures. Instead of doing full string matches, matching individual filter string --- .../source/SparkRuntimeFilterableScan.java | 7 +- .../iceberg/spark/source/SparkScan.java | 10 +- .../spark/source/TestFilteredScan.java | 24 ++-- .../iceberg/spark/source/TestSparkScan.java | 11 +- .../iceberg/spark/sql/TestFilterPushDown.java | 132 ++++++++++-------- 5 files changed, 100 insertions(+), 84 deletions(-) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRuntimeFilterableScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRuntimeFilterableScan.java index 7ab3b260d9ae..a5393ab64ff8 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRuntimeFilterableScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRuntimeFilterableScan.java @@ -197,9 +197,8 @@ private Expression convertRuntimePredicates(Predicate[] predicates) { protected String runtimeFiltersDesc() { return runtimeFilters.stream() - .flatMap(x -> ExpressionVisitors.visit(x, ExpressionFlattener.INSTANCE).stream()) - .map(Spark3Util::describe) - .sorted() - .collect(Collectors.joining(", ")); + .flatMap(x -> ExpressionVisitors.visit(x, ExpressionFlattener.INSTANCE).stream()) + .map(Spark3Util::describe).sorted().collect(Collectors.joining(", ")); + } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index cba57da1f114..45e0b9ec943a 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -165,10 +165,9 @@ protected Expression filter() { protected String filtersDesc() { return filters.stream() - .flatMap(x -> ExpressionVisitors.visit(x, ExpressionFlattener.INSTANCE).stream()) - .map(Spark3Util::describe) - .sorted() - .collect(Collectors.joining(", ")); + .flatMap(x -> ExpressionVisitors.visit(x, ExpressionFlattener.INSTANCE).stream()) + .map(Spark3Util::describe) + .sorted().collect(Collectors.joining(", ")); } protected Types.StructType groupingKeyType() { @@ -396,7 +395,8 @@ static class ExpressionFlattener extends ExpressionVisitors.ExpressionVisitor
  • alwaysTrue() { diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java index b8ae001a375c..4c78d18e4a9f 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java @@ -274,21 +274,21 @@ public void testUnpartitionedTimestampFilter() { @TestTemplate public void testSparkBatchScanEquality_Bug16563() { CaseInsensitiveStringMap options = - new CaseInsensitiveStringMap(ImmutableMap.of("path", unpartitioned.toString())); + new CaseInsensitiveStringMap(ImmutableMap.of("path", unpartitioned.toString())); // set spark.sql.caseSensitive to false String caseSensitivityBeforeTest = TestFilteredScan.spark.conf().get("spark.sql.caseSensitive"); TestFilteredScan.spark.conf().set("spark.sql.caseSensitive", "false"); try { SparkScanBuilder builder1 = - new SparkScanBuilder(spark, TABLES.load(options.get("path")), options); + new SparkScanBuilder(spark, TABLES.load(options.get("path")), options); Filter f1 = GreaterThan.apply("ID", 10); Filter f2 = LessThan.apply("data", "abc"); pushFilters(builder1, f1, f2); SparkScanBuilder builder2 = - new SparkScanBuilder(spark, TABLES.load(options.get("path")), options); + new SparkScanBuilder(spark, TABLES.load(options.get("path")), options); pushFilters(builder2, f2, f1); Scan scan1 = builder1.build(); @@ -296,20 +296,14 @@ public void testSparkBatchScanEquality_Bug16563() { assertThat(scan1).as("is be equal to ").isEqualTo(scan2); - Filter runtimeFilter1 = In.apply("ID", new Object[] {10, 12, 13}); - Filter runtimeFilter2 = In.apply("data", new Object[] {"abc", "def", "cde"}); + Filter runtimeFilter1 = In.apply("ID", new Object[]{10, 12, 13}); + Filter runtimeFilter2 = In.apply("data", new Object[]{"abc", "def", "cde"}); - ((SparkRuntimeFilterableScan) scan1) - .filter( - Arrays.stream(new Filter[] {runtimeFilter1, runtimeFilter2}) - .map(Filter::toV2) - .toArray(Predicate[]::new)); + ((SparkRuntimeFilterableScan)scan1).filter( + Arrays.stream(new Filter[]{runtimeFilter1, runtimeFilter2}).map(Filter::toV2).toArray(Predicate[]::new)); - ((SparkRuntimeFilterableScan) scan2) - .filter( - Arrays.stream(new Filter[] {runtimeFilter2, runtimeFilter1}) - .map(Filter::toV2) - .toArray(Predicate[]::new)); + ((SparkRuntimeFilterableScan)scan2).filter( + Arrays.stream(new Filter[]{runtimeFilter2, runtimeFilter1}).map(Filter::toV2).toArray(Predicate[]::new)); assertThat(scan1).as("is be equal to ").isEqualTo(scan2); } finally { diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java index fd133de4baa8..a2bd0b467103 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java @@ -46,6 +46,7 @@ import org.apache.iceberg.spark.SparkCatalogConfig; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkSQLProperties; +import org.apache.iceberg.spark.SparkV2Filters; import org.apache.iceberg.spark.TestBaseWithCatalog; import org.apache.iceberg.spark.functions.BucketFunction; import org.apache.iceberg.spark.functions.DaysFunction; @@ -1061,6 +1062,14 @@ public void testCopyOnWriteScanDescription() throws Exception { () -> { Predicate predicate1 = new Predicate("=", expressions(fieldRef("id"), intLit(2))); Predicate predicate2 = new Predicate("<", expressions(fieldRef("id"), intLit(10))); + String filter1Desc = Spark3Util.describe(SparkV2Filters.convert(predicate1)); + String filter2Desc = Spark3Util.describe(SparkV2Filters.convert(predicate2)); + String expectedFilterDesc; + if (filter1Desc.compareTo(filter2Desc) < 0) { + expectedFilterDesc = filter1Desc + ", " + filter2Desc; + } else { + expectedFilterDesc = filter2Desc + ", " + filter1Desc; + } pushFilters(builder, predicate1, predicate2); Scan scan = builder.buildCopyOnWriteScan(); @@ -1071,7 +1080,7 @@ public void testCopyOnWriteScanDescription() throws Exception { assertThat(description).contains("schemaId=" + table.schema().schemaId()); assertThat(description).contains("snapshotId=" + table.currentSnapshot().snapshotId()); assertThat(description).contains("branch=null"); - assertThat(description).contains("filters=id = 2, id < 10"); + assertThat(description).contains("filters=" + expectedFilterDesc); assertThat(description).contains("groupedBy=data"); }); } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java index e5a9d63b68d6..fce7e582d74f 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java @@ -27,6 +27,7 @@ import java.nio.ByteOrder; import java.sql.Timestamp; import java.time.Instant; +import java.util.Arrays; import java.util.List; import org.apache.iceberg.Parameter; import org.apache.iceberg.ParameterizedTestExtension; @@ -92,7 +93,8 @@ public void testFilterPushdownWithDecimalValues() { checkFilters( "dep = 'd1' AND salary > 100.03" /* query predicate */, "isnotnull(salary) AND (salary > 100.03)" /* Spark post scan filter */, - "dep IS NOT NULL, salary IS NOT NULL, dep = 'd1', salary > 100.03" /* Iceberg scan filters */, + /* Iceberg scan filters */ + new String[] {"dep IS NOT NULL", "salary IS NOT NULL", "dep = 'd1'", "salary > 100.03"}, ImmutableList.of(row(2, new BigDecimal("100.05"), "d1"))); } @@ -114,12 +116,12 @@ public void testFilterPushdownWithIdentityTransform() { checkOnlyIcebergFilters( "dep IS NULL" /* query predicate */, - "dep IS NULL" /* Iceberg scan filters */, + new String[] {"dep IS NULL"} /* Iceberg scan filters */, ImmutableList.of(row(6, 600, null))); checkOnlyIcebergFilters( "dep IS NOT NULL" /* query predicate */, - "dep IS NOT NULL" /* Iceberg scan filters */, + new String[] {"dep IS NOT NULL"} /* Iceberg scan filters */, ImmutableList.of( row(1, 100, "d1"), row(2, 200, "d2"), @@ -129,82 +131,88 @@ public void testFilterPushdownWithIdentityTransform() { checkOnlyIcebergFilters( "dep = 'd3'" /* query predicate */, - "dep IS NOT NULL, dep = 'd3'" /* Iceberg scan filters */, + new String[] {"dep IS NOT NULL", "dep = 'd3'"} /* Iceberg scan filters */, ImmutableList.of(row(3, 300, "d3"))); checkOnlyIcebergFilters( "dep > 'd3'" /* query predicate */, - "dep IS NOT NULL, dep > 'd3'" /* Iceberg scan filters */, + new String[] {"dep IS NOT NULL", "dep > 'd3'"} /* Iceberg scan filters */, ImmutableList.of(row(4, 400, "d4"), row(5, 500, "d5"))); checkOnlyIcebergFilters( "dep >= 'd5'" /* query predicate */, - "dep IS NOT NULL, dep >= 'd5'" /* Iceberg scan filters */, + new String[] {"dep IS NOT NULL", "dep >= 'd5'"} /* Iceberg scan filters */, ImmutableList.of(row(5, 500, "d5"))); checkOnlyIcebergFilters( "dep < 'd2'" /* query predicate */, - "dep IS NOT NULL, dep < 'd2'" /* Iceberg scan filters */, + new String[] {"dep IS NOT NULL", "dep < 'd2'"} /* Iceberg scan filters */, ImmutableList.of(row(1, 100, "d1"))); checkOnlyIcebergFilters( "dep <= 'd2'" /* query predicate */, - "dep IS NOT NULL, dep <= 'd2'" /* Iceberg scan filters */, + new String[] {"dep IS NOT NULL", "dep <= 'd2'"} /* Iceberg scan filters */, ImmutableList.of(row(1, 100, "d1"), row(2, 200, "d2"))); checkOnlyIcebergFilters( "dep <=> 'd3'" /* query predicate */, - "dep = 'd3'" /* Iceberg scan filters */, + new String[] {"dep = 'd3'"} /* Iceberg scan filters */, ImmutableList.of(row(3, 300, "d3"))); checkOnlyIcebergFilters( "dep IN (null, 'd1')" /* query predicate */, - "dep IN ('d1')" /* Iceberg scan filters */, + new String[] {"dep IN ('d1')"} /* Iceberg scan filters */, ImmutableList.of(row(1, 100, "d1"))); checkOnlyIcebergFilters( "dep NOT IN ('d2', 'd4')" /* query predicate */, - "(dep IS NOT NULL AND dep NOT IN ('d2', 'd4'))" /* Iceberg scan filters */, + new String[] {"dep IS NOT NULL", "dep NOT IN ('d2', 'd4')"} /* Iceberg scan filters */, ImmutableList.of(row(1, 100, "d1"), row(3, 300, "d3"), row(5, 500, "d5"))); checkOnlyIcebergFilters( "dep = 'd1' AND dep IS NOT NULL" /* query predicate */, - "dep = 'd1', dep IS NOT NULL" /* Iceberg scan filters */, + new String[] {"dep = 'd1'", "dep IS NOT NULL"} /* Iceberg scan filters */, ImmutableList.of(row(1, 100, "d1"))); checkOnlyIcebergFilters( "dep = 'd1' OR dep = 'd2' OR dep = 'd3'" /* query predicate */, - "((dep = 'd1' OR dep = 'd2') OR dep = 'd3')" /* Iceberg scan filters */, + new String[] {"((dep = 'd1' OR dep = 'd2') OR dep = 'd3')"} /* Iceberg scan filters */, ImmutableList.of(row(1, 100, "d1"), row(2, 200, "d2"), row(3, 300, "d3"))); checkFilters( "dep = 'd1' AND id = 1" /* query predicate */, "isnotnull(id) AND (id = 1)" /* Spark post scan filter */, - "dep IS NOT NULL, id IS NOT NULL, dep = 'd1', id = 1" /* Iceberg scan filters */, + new String[] { + "dep IS NOT NULL", "id IS NOT NULL", "dep = 'd1'", "id = 1" + } /* Iceberg scan filters */, ImmutableList.of(row(1, 100, "d1"))); checkFilters( "dep = 'd2' OR id = 1" /* query predicate */, "(dep = d2) OR (id = 1)" /* Spark post scan filter */, - "(dep = 'd2' OR id = 1)" /* Iceberg scan filters */, + new String[] {"(dep = 'd2' OR id = 1)"} /* Iceberg scan filters */, ImmutableList.of(row(1, 100, "d1"), row(2, 200, "d2"))); checkFilters( "dep LIKE 'd1%' AND id = 1" /* query predicate */, "isnotnull(id) AND (id = 1)" /* Spark post scan filter */, - "dep IS NOT NULL, id IS NOT NULL, dep LIKE 'd1%', id = 1" /* Iceberg scan filters */, + new String[] { + "dep IS NOT NULL", "id IS NOT NULL", "dep LIKE 'd1%'", "id = 1" + } /* Iceberg scan filters */, ImmutableList.of(row(1, 100, "d1"))); checkFilters( "dep NOT LIKE 'd5%' AND (id = 1 OR id = 5)" /* query predicate */, "(id = 1) OR (id = 5)" /* Spark post scan filter */, - "dep IS NOT NULL, NOT (dep LIKE 'd5%'), (id = 1 OR id = 5)" /* Iceberg scan filters */, + new String[] { + "dep IS NOT NULL", "NOT (dep LIKE 'd5%')", "(id = 1 OR id = 5)" + } /* Iceberg scan filters */, ImmutableList.of(row(1, 100, "d1"))); checkFilters( "dep LIKE '%d5' AND id IN (1, 5)" /* query predicate */, "EndsWith(dep, d5) AND id IN (1,5)" /* Spark post scan filter */, - "dep IS NOT NULL, id IN (1, 5)" /* Iceberg scan filters */, + new String[] {"dep IS NOT NULL", "id IN (1, 5)"} /* Iceberg scan filters */, ImmutableList.of(row(5, 500, "d5"))); } @@ -226,14 +234,14 @@ public void testFilterPushdownWithHoursTransform() { () -> { checkOnlyIcebergFilters( "t IS NULL" /* query predicate */, - "t IS NULL" /* Iceberg scan filters */, + new String[] {"t IS NULL"} /* Iceberg scan filters */, ImmutableList.of(row(3, 300, null))); // strict/inclusive projections for t < TIMESTAMP '2021-06-30T02:00:00.000Z' are equal, // so this filter selects entire partitions and can be pushed down completely checkOnlyIcebergFilters( "t < TIMESTAMP '2021-06-30T02:00:00.000Z'" /* query predicate */, - "t IS NOT NULL, t < 1625018400000000" /* Iceberg scan filters */, + new String[] {"t IS NOT NULL", "t < 1625018400000000"} /* Iceberg scan filters */, ImmutableList.of(row(1, 100, timestamp("2021-06-30T01:00:00.0Z")))); // strict/inclusive projections for t < TIMESTAMP '2021-06-30T01:00:00.001Z' differ, @@ -241,7 +249,7 @@ public void testFilterPushdownWithHoursTransform() { checkFilters( "t < TIMESTAMP '2021-06-30T01:00:00.001Z'" /* query predicate */, "t < 2021-06-30 01:00:00.001" /* Spark post scan filter */, - "t IS NOT NULL, t < 1625014800001000" /* Iceberg scan filters */, + new String[] {"t IS NOT NULL", "t < 1625014800001000"} /* Iceberg scan filters */, ImmutableList.of(row(1, 100, timestamp("2021-06-30T01:00:00.0Z")))); // strict/inclusive projections for t <= TIMESTAMP '2021-06-30T01:00:00.000Z' differ, @@ -249,7 +257,7 @@ public void testFilterPushdownWithHoursTransform() { checkFilters( "t <= TIMESTAMP '2021-06-30T01:00:00.000Z'" /* query predicate */, "t <= 2021-06-30 01:00:00" /* Spark post scan filter */, - "t IS NOT NULL, t <= 1625014800000000" /* Iceberg scan filters */, + new String[] {"t IS NOT NULL", "t <= 1625014800000000"} /* Iceberg scan filters */, ImmutableList.of(row(1, 100, timestamp("2021-06-30T01:00:00.0Z")))); }); } @@ -273,14 +281,14 @@ public void testFilterPushdownWithDaysTransform() { () -> { checkOnlyIcebergFilters( "t IS NULL" /* query predicate */, - "t IS NULL" /* Iceberg scan filters */, + new String[] {"t IS NULL"} /* Iceberg scan filters */, ImmutableList.of(row(4, 400, null))); // strict/inclusive projections for t < TIMESTAMP '2021-07-05T00:00:00.000Z' are equal, // so this filter selects entire partitions and can be pushed down completely checkOnlyIcebergFilters( "t < TIMESTAMP '2021-07-05T00:00:00.000Z'" /* query predicate */, - "t IS NOT NULL, t < 1625443200000000" /* Iceberg scan filters */, + new String[] {"t IS NOT NULL", "t < 1625443200000000"} /* Iceberg scan filters */, ImmutableList.of( row(1, 100, timestamp("2021-06-15T01:00:00.000Z")), row(2, 200, timestamp("2021-06-30T02:00:00.000Z")))); @@ -290,7 +298,7 @@ public void testFilterPushdownWithDaysTransform() { checkFilters( "t < TIMESTAMP '2021-06-30T03:00:00.000Z'" /* query predicate */, "t < 2021-06-30 03:00:00" /* Spark post scan filter */, - "t IS NOT NULL, t < 1625022000000000" /* Iceberg scan filters */, + new String[] {"t IS NOT NULL", "t < 1625022000000000"} /* Iceberg scan filters */, ImmutableList.of( row(1, 100, timestamp("2021-06-15T01:00:00.000Z")), row(2, 200, timestamp("2021-06-30T02:00:00.000Z")))); @@ -316,14 +324,14 @@ public void testFilterPushdownWithMonthsTransform() { () -> { checkOnlyIcebergFilters( "t IS NULL" /* query predicate */, - "t IS NULL" /* Iceberg scan filters */, + new String[] {"t IS NULL"} /* Iceberg scan filters */, ImmutableList.of(row(4, 400, null))); // strict/inclusive projections for t < TIMESTAMP '2021-07-01T00:00:00.000Z' are equal, // so this filter selects entire partitions and can be pushed down completely checkOnlyIcebergFilters( "t < TIMESTAMP '2021-07-01T00:00:00.000Z'" /* query predicate */, - "t IS NOT NULL, t < 1625097600000000" /* Iceberg scan filters */, + new String[] {"t IS NOT NULL", "t < 1625097600000000"} /* Iceberg scan filters */, ImmutableList.of( row(1, 100, timestamp("2021-06-30T01:00:00.000Z")), row(2, 200, timestamp("2021-06-30T02:00:00.000Z")))); @@ -333,7 +341,7 @@ public void testFilterPushdownWithMonthsTransform() { checkFilters( "t < TIMESTAMP '2021-06-30T03:00:00.000Z'" /* query predicate */, "t < 2021-06-30 03:00:00" /* Spark post scan filter */, - "t IS NOT NULL, t < 1625022000000000" /* Iceberg scan filters */, + new String[] {"t IS NOT NULL", "t < 1625022000000000"} /* Iceberg scan filters */, ImmutableList.of( row(1, 100, timestamp("2021-06-30T01:00:00.000Z")), row(2, 200, timestamp("2021-06-30T02:00:00.000Z")))); @@ -359,14 +367,14 @@ public void testFilterPushdownWithYearsTransform() { () -> { checkOnlyIcebergFilters( "t IS NULL" /* query predicate */, - "t IS NULL" /* Iceberg scan filters */, + new String[] {"t IS NULL"} /* Iceberg scan filters */, ImmutableList.of(row(3, 300, null))); // strict/inclusive projections for t < TIMESTAMP '2022-01-01T00:00:00.000Z' are equal, // so this filter selects entire partitions and can be pushed down completely checkOnlyIcebergFilters( "t < TIMESTAMP '2022-01-01T00:00:00.000Z'" /* query predicate */, - "t IS NOT NULL, t < 1640995200000000" /* Iceberg scan filters */, + new String[] {"t IS NOT NULL", "t < 1640995200000000"} /* Iceberg scan filters */, ImmutableList.of( row(1, 100, timestamp("2021-06-30T01:00:00.000Z")), row(2, 200, timestamp("2021-06-30T02:00:00.000Z")))); @@ -376,7 +384,7 @@ public void testFilterPushdownWithYearsTransform() { checkFilters( "t < TIMESTAMP '2021-06-30T03:00:00.000Z'" /* query predicate */, "t < 2021-06-30 03:00:00" /* Spark post scan filter */, - "t IS NOT NULL, t < 1625022000000000" /* Iceberg scan filters */, + new String[] {"t IS NOT NULL", "t < 1625022000000000"} /* Iceberg scan filters */, ImmutableList.of( row(1, 100, timestamp("2021-06-30T01:00:00.000Z")), row(2, 200, timestamp("2021-06-30T02:00:00.000Z")))); @@ -398,7 +406,7 @@ public void testFilterPushdownWithBucketTransform() { checkFilters( "dep = 'd1' AND id = 1" /* query predicate */, "id = 1" /* Spark post scan filter */, - "dep IS NOT NULL, id IS NOT NULL, dep = 'd1'" /* Iceberg scan filters */, + new String[] {"dep IS NOT NULL", "id IS NOT NULL", "dep = 'd1'"} /* Iceberg scan filters */, ImmutableList.of(row(1, 100, "d1"))); } @@ -417,13 +425,13 @@ public void testFilterPushdownWithTruncateTransform() { checkOnlyIcebergFilters( "dep LIKE 'd%'" /* query predicate */, - "dep IS NOT NULL, dep LIKE 'd%'" /* Iceberg scan filters */, + new String[] {"dep IS NOT NULL", "dep LIKE 'd%'"} /* Iceberg scan filters */, ImmutableList.of(row(1, 100, "d1"), row(2, 200, "d2"))); checkFilters( "dep = 'd1'" /* query predicate */, "dep = d1" /* Spark post scan filter */, - "dep IS NOT NULL" /* Iceberg scan filters */, + new String[] {"dep IS NOT NULL"} /* Iceberg scan filters */, ImmutableList.of(row(1, 100, "d1"))); } @@ -441,7 +449,7 @@ public void testFilterPushdownWithSpecEvolutionAndIdentityTransforms() { // the filter can be pushed completely because all specs include identity(dep) checkOnlyIcebergFilters( "dep = 'd1'" /* query predicate */, - "dep IS NOT NULL, dep = 'd1'" /* Iceberg scan filters */, + new String[] {"dep IS NOT NULL", "dep = 'd1'"} /* Iceberg scan filters */, ImmutableList.of(row(1, 100, "d1", "sd1"))); Table table = validationCatalog.loadTable(tableIdent); @@ -453,7 +461,7 @@ public void testFilterPushdownWithSpecEvolutionAndIdentityTransforms() { // the filter can be pushed completely because all specs include identity(dep) checkOnlyIcebergFilters( "dep = 'd1'" /* query predicate */, - "dep IS NOT NULL, dep = 'd1'" /* Iceberg scan filters */, + new String[] {"dep IS NOT NULL", "dep = 'd1'"} /* Iceberg scan filters */, ImmutableList.of(row(1, 100, "d1", "sd1"))); table.updateSpec().removeField("sub_dep").removeField("dep").commit(); @@ -464,7 +472,7 @@ public void testFilterPushdownWithSpecEvolutionAndIdentityTransforms() { checkFilters( "dep = 'd1'" /* query predicate */, "isnotnull(dep) AND (dep = d1)" /* Spark post scan filter */, - "dep IS NOT NULL, dep = 'd1'" /* Iceberg scan filters */, + new String[] {"dep IS NOT NULL", "dep = 'd1'"} /* Iceberg scan filters */, ImmutableList.of(row(1, 100, "d1", "sd1"))); } @@ -482,7 +490,7 @@ public void testFilterPushdownWithSpecEvolutionAndTruncateTransform() { // the filter can be pushed completely because the current spec supports it checkOnlyIcebergFilters( "dep LIKE 'd1%'" /* query predicate */, - "dep IS NOT NULL, dep LIKE 'd1%'" /* Iceberg scan filters */, + new String[] {"dep IS NOT NULL", "dep LIKE 'd1%'"} /* Iceberg scan filters */, ImmutableList.of(row(1, 100, "d1"))); Table table = validationCatalog.loadTable(tableIdent); @@ -497,7 +505,7 @@ public void testFilterPushdownWithSpecEvolutionAndTruncateTransform() { // the filter can be pushed completely because both specs support it checkOnlyIcebergFilters( "dep LIKE 'd%'" /* query predicate */, - "dep IS NOT NULL, dep LIKE 'd%'" /* Iceberg scan filters */, + new String[] {"dep IS NOT NULL", "dep LIKE 'd%'"} /* Iceberg scan filters */, ImmutableList.of(row(1, 100, "d1"), row(2, 200, "d2"))); // the filter can't be pushed completely because the second spec is truncate(dep, 1) and @@ -505,7 +513,9 @@ public void testFilterPushdownWithSpecEvolutionAndTruncateTransform() { checkFilters( "dep LIKE 'd1%' AND id = 1" /* query predicate */, "(isnotnull(id) AND StartsWith(dep, d1)) AND (id = 1)" /* Spark post scan filter */, - "dep IS NOT NULL, id IS NOT NULL, dep LIKE 'd1%', id = 1" /* Iceberg scan filters */, + new String[] { + "dep IS NOT NULL", "id IS NOT NULL", "dep LIKE 'd1%'", "id = 1" + } /* Iceberg scan filters */, ImmutableList.of(row(1, 100, "d1"))); } @@ -526,7 +536,7 @@ public void testFilterPushdownWithSpecEvolutionAndTimeTransforms() { // the filter can be pushed completely because the current spec supports it checkOnlyIcebergFilters( "t < TIMESTAMP '2021-07-01T00:00:00.000Z'" /* query predicate */, - "t IS NOT NULL, t < 1625097600000000" /* Iceberg scan filters */, + new String[] {"t IS NOT NULL", "t < 1625097600000000"} /* Iceberg scan filters */, ImmutableList.of(row(1, 100, timestamp("2021-06-30T01:00:00.000Z")))); Table table = validationCatalog.loadTable(tableIdent); @@ -541,7 +551,7 @@ public void testFilterPushdownWithSpecEvolutionAndTimeTransforms() { // the filter can be pushed completely because both specs support it checkOnlyIcebergFilters( "t < TIMESTAMP '2021-06-01T00:00:00.000Z'" /* query predicate */, - "t IS NOT NULL, t < 1622505600000000" /* Iceberg scan filters */, + new String[] {"t IS NOT NULL", "t < 1622505600000000"} /* Iceberg scan filters */, ImmutableList.of(row(2, 200, timestamp("2021-05-30T01:00:00.000Z")))); }); } @@ -560,28 +570,28 @@ public void testFilterPushdownWithSpecialFloatingPointPartitionValues() { checkOnlyIcebergFilters( "salary = 100.5" /* query predicate */, - "salary IS NOT NULL, salary = 100.5" /* Iceberg scan filters */, + new String[] {"salary IS NOT NULL", "salary = 100.5"} /* Iceberg scan filters */, ImmutableList.of(row(1, 100.5))); checkOnlyIcebergFilters( "salary = double('NaN')" /* query predicate */, - "salary IS NOT NULL, is_nan(salary)" /* Iceberg scan filters */, + new String[] {"salary IS NOT NULL", "is_nan(salary)"} /* Iceberg scan filters */, ImmutableList.of(row(2, Double.NaN))); checkOnlyIcebergFilters( "salary != double('NaN')" /* query predicate */, - "salary IS NOT NULL, NOT (is_nan(salary))" /* Iceberg scan filters */, + new String[] {"salary IS NOT NULL", "NOT (is_nan(salary))"} /* Iceberg scan filters */, ImmutableList.of( row(1, 100.5), row(3, Double.POSITIVE_INFINITY), row(4, Double.NEGATIVE_INFINITY))); checkOnlyIcebergFilters( "salary = double('infinity')" /* query predicate */, - "salary IS NOT NULL, salary = Infinity" /* Iceberg scan filters */, + new String[] {"salary IS NOT NULL", "salary = Infinity"} /* Iceberg scan filters */, ImmutableList.of(row(3, Double.POSITIVE_INFINITY))); checkOnlyIcebergFilters( "salary = double('-infinity')" /* query predicate */, - "salary IS NOT NULL, salary = -Infinity" /* Iceberg scan filters */, + new String[] {"salary IS NOT NULL", "salary = -Infinity"} /* Iceberg scan filters */, ImmutableList.of(row(4, Double.NEGATIVE_INFINITY))); } @@ -607,7 +617,7 @@ public void testVariantExtractFiltering() { checkFilters( "try_variant_get(data, '$.num', 'int') IS NOT NULL", "isnotnull(data) AND isnotnull(try_variant_get(data, $.num, IntegerType, false, Some(UTC)))", - "data IS NOT NULL", + new String[] {"data IS NOT NULL"}, ImmutableList.of( row(1L, toSparkVariantRow("foo", 25)), row(2L, toSparkVariantRow("bar", 30)), @@ -616,45 +626,44 @@ public void testVariantExtractFiltering() { checkFilters( "try_variant_get(data, '$.num', 'int') IS NULL", "isnull(try_variant_get(data, $.num, IntegerType, false, Some(UTC)))", - "", + new String[] {""}, ImmutableList.of(row(4L, null))); checkFilters( "try_variant_get(data, '$.num', 'int') > 30", "isnotnull(data) AND (try_variant_get(data, $.num, IntegerType, false, Some(UTC)) > 30)", - "data IS NOT NULL", + new String[] {"data IS NOT NULL"}, ImmutableList.of(row(3L, toSparkVariantRow("baz", 35)))); checkFilters( "try_variant_get(data, '$.num', 'int') = 30", "isnotnull(data) AND (try_variant_get(data, $.num, IntegerType, false, Some(UTC)) = 30)", - "data IS NOT NULL", + new String[] {"data IS NOT NULL"}, ImmutableList.of(row(2L, toSparkVariantRow("bar", 30)))); checkFilters( "try_variant_get(data, '$.num', 'int') IN (25, 35)", "try_variant_get(data, $.num, IntegerType, false, Some(UTC)) IN (25,35)", - "", + new String[] {""}, ImmutableList.of( row(1L, toSparkVariantRow("foo", 25)), row(3L, toSparkVariantRow("baz", 35)))); checkFilters( "try_variant_get(data, '$.num', 'int') != 25", "isnotnull(data) AND NOT (try_variant_get(data, $.num, IntegerType, false, Some(UTC)) = 25)", - "data IS NOT NULL", + new String[] {"data IS NOT NULL"}, ImmutableList.of( row(2L, toSparkVariantRow("bar", 30)), row(3L, toSparkVariantRow("baz", 35)))); }); } private void checkOnlyIcebergFilters( - String predicate, String icebergFilters, List expectedRows) { - + String predicate, String[] icebergFilters, List expectedRows) { checkFilters(predicate, null, icebergFilters, expectedRows); } private void checkFilters( - String predicate, String sparkFilter, String icebergFilters, List expectedRows) { + String predicate, String sparkFilter, String[] icebergFilters, List expectedRows) { Action check = () -> { @@ -674,9 +683,14 @@ private void checkFilters( assertThat(planAsString).as("Should be no post scan filter").doesNotContain("Filter ("); } - assertThat(planAsString) - .as("Pushed filters must match") - .contains(", filters=" + icebergFilters + ","); + int startIndex = planAsString.indexOf("filters="); + int endIndex = planAsString.indexOf("runtimeFilters"); + String filterStringFromPlan = planAsString.substring(startIndex, endIndex); + Arrays.stream(icebergFilters) + .forEach( + filter -> { + assertThat(filterStringFromPlan).as("Pushed filters must contain").contains(filter); + }); } private Timestamp timestamp(String timestampAsString) { From e8ebbd4193e45d5a08e6fe64072bb54456416005 Mon Sep 17 00:00:00 2001 From: Asif Hussain Shahid Date: Tue, 26 May 2026 18:16:55 -0700 Subject: [PATCH 4/4] Bug-16563: Fixed style failure --- .../source/SparkRuntimeFilterableScan.java | 7 +++--- .../iceberg/spark/source/SparkScan.java | 10 ++++---- .../spark/source/TestFilteredScan.java | 24 ++++++++++++------- 3 files changed, 24 insertions(+), 17 deletions(-) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRuntimeFilterableScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRuntimeFilterableScan.java index a5393ab64ff8..7ab3b260d9ae 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRuntimeFilterableScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRuntimeFilterableScan.java @@ -197,8 +197,9 @@ private Expression convertRuntimePredicates(Predicate[] predicates) { protected String runtimeFiltersDesc() { return runtimeFilters.stream() - .flatMap(x -> ExpressionVisitors.visit(x, ExpressionFlattener.INSTANCE).stream()) - .map(Spark3Util::describe).sorted().collect(Collectors.joining(", ")); - + .flatMap(x -> ExpressionVisitors.visit(x, ExpressionFlattener.INSTANCE).stream()) + .map(Spark3Util::describe) + .sorted() + .collect(Collectors.joining(", ")); } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 45e0b9ec943a..cba57da1f114 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -165,9 +165,10 @@ protected Expression filter() { protected String filtersDesc() { return filters.stream() - .flatMap(x -> ExpressionVisitors.visit(x, ExpressionFlattener.INSTANCE).stream()) - .map(Spark3Util::describe) - .sorted().collect(Collectors.joining(", ")); + .flatMap(x -> ExpressionVisitors.visit(x, ExpressionFlattener.INSTANCE).stream()) + .map(Spark3Util::describe) + .sorted() + .collect(Collectors.joining(", ")); } protected Types.StructType groupingKeyType() { @@ -395,8 +396,7 @@ static class ExpressionFlattener extends ExpressionVisitors.ExpressionVisitor
  • alwaysTrue() { diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java index 4c78d18e4a9f..b8ae001a375c 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java @@ -274,21 +274,21 @@ public void testUnpartitionedTimestampFilter() { @TestTemplate public void testSparkBatchScanEquality_Bug16563() { CaseInsensitiveStringMap options = - new CaseInsensitiveStringMap(ImmutableMap.of("path", unpartitioned.toString())); + new CaseInsensitiveStringMap(ImmutableMap.of("path", unpartitioned.toString())); // set spark.sql.caseSensitive to false String caseSensitivityBeforeTest = TestFilteredScan.spark.conf().get("spark.sql.caseSensitive"); TestFilteredScan.spark.conf().set("spark.sql.caseSensitive", "false"); try { SparkScanBuilder builder1 = - new SparkScanBuilder(spark, TABLES.load(options.get("path")), options); + new SparkScanBuilder(spark, TABLES.load(options.get("path")), options); Filter f1 = GreaterThan.apply("ID", 10); Filter f2 = LessThan.apply("data", "abc"); pushFilters(builder1, f1, f2); SparkScanBuilder builder2 = - new SparkScanBuilder(spark, TABLES.load(options.get("path")), options); + new SparkScanBuilder(spark, TABLES.load(options.get("path")), options); pushFilters(builder2, f2, f1); Scan scan1 = builder1.build(); @@ -296,14 +296,20 @@ public void testSparkBatchScanEquality_Bug16563() { assertThat(scan1).as("is be equal to ").isEqualTo(scan2); - Filter runtimeFilter1 = In.apply("ID", new Object[]{10, 12, 13}); - Filter runtimeFilter2 = In.apply("data", new Object[]{"abc", "def", "cde"}); + Filter runtimeFilter1 = In.apply("ID", new Object[] {10, 12, 13}); + Filter runtimeFilter2 = In.apply("data", new Object[] {"abc", "def", "cde"}); - ((SparkRuntimeFilterableScan)scan1).filter( - Arrays.stream(new Filter[]{runtimeFilter1, runtimeFilter2}).map(Filter::toV2).toArray(Predicate[]::new)); + ((SparkRuntimeFilterableScan) scan1) + .filter( + Arrays.stream(new Filter[] {runtimeFilter1, runtimeFilter2}) + .map(Filter::toV2) + .toArray(Predicate[]::new)); - ((SparkRuntimeFilterableScan)scan2).filter( - Arrays.stream(new Filter[]{runtimeFilter2, runtimeFilter1}).map(Filter::toV2).toArray(Predicate[]::new)); + ((SparkRuntimeFilterableScan) scan2) + .filter( + Arrays.stream(new Filter[] {runtimeFilter2, runtimeFilter1}) + .map(Filter::toV2) + .toArray(Predicate[]::new)); assertThat(scan1).as("is be equal to ").isEqualTo(scan2); } finally {