Skip to content

Commit

Permalink
Spark: Fix Decimal value conversion in V2 filters (#8682)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi committed Sep 29, 2023
1 parent 863f396 commit 10367c3
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.spark.sql.connector.expressions.filter.Not;
import org.apache.spark.sql.connector.expressions.filter.Or;
import org.apache.spark.sql.connector.expressions.filter.Predicate;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.unsafe.types.UTF8String;

public class SparkV2Filters {
Expand Down Expand Up @@ -285,6 +286,8 @@ private static boolean isLiteral(org.apache.spark.sql.connector.expressions.Expr
private static Object convertLiteral(Literal<?> literal) {
if (literal.value() instanceof UTF8String) {
return ((UTF8String) literal.value()).toString();
} else if (literal.value() instanceof Decimal) {
return ((Decimal) literal.value()).toJavaBigDecimal();
}
return literal.value();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.sql;

import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
Expand All @@ -38,6 +39,24 @@ public void removeTables() {
sql("DROP TABLE IF EXISTS tmp_view");
}

@Test
public void testFilterPushdownWithDecimalValues() {
sql(
"CREATE TABLE %s (id INT, salary DECIMAL(10, 2), dep STRING)"
+ "USING iceberg "
+ "PARTITIONED BY (dep)",
tableName);

sql("INSERT INTO %s VALUES (1, 100.01, 'd1')", tableName);
sql("INSERT INTO %s VALUES (2, 100.05, 'd1')", tableName);

checkFilters(
"dep = 'd1' AND salary > 100.03" /* query predicate */,
"isnotnull(salary) AND (salary > 100.03)" /* Spark post scan filter */,
"dep IS NOT NULL, salary IS NOT NULL, dep = 'd1', salary > 100.03" /* Iceberg scan filters */,
ImmutableList.of(row(2, new BigDecimal("100.05"), "d1")));
}

@Test
public void testFilterPushdownWithIdentityTransform() {
sql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.spark.sql.connector.expressions.filter.Not;
import org.apache.spark.sql.connector.expressions.filter.Or;
import org.apache.spark.sql.connector.expressions.filter.Predicate;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.unsafe.types.UTF8String;

public class SparkV2Filters {
Expand Down Expand Up @@ -378,6 +379,8 @@ private static boolean isLiteral(org.apache.spark.sql.connector.expressions.Expr
private static Object convertLiteral(Literal<?> literal) {
if (literal.value() instanceof UTF8String) {
return ((UTF8String) literal.value()).toString();
} else if (literal.value() instanceof Decimal) {
return ((Decimal) literal.value()).toJavaBigDecimal();
}
return literal.value();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
import static org.apache.iceberg.PlanningMode.LOCAL;

import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
Expand Down Expand Up @@ -56,6 +57,25 @@ public void removeTables() {
sql("DROP TABLE IF EXISTS tmp_view");
}

@Test
public void testFilterPushdownWithDecimalValues() {
sql(
"CREATE TABLE %s (id INT, salary DECIMAL(10, 2), dep STRING)"
+ "USING iceberg "
+ "PARTITIONED BY (dep)",
tableName);
configurePlanningMode(planningMode);

sql("INSERT INTO %s VALUES (1, 100.01, 'd1')", tableName);
sql("INSERT INTO %s VALUES (2, 100.05, 'd1')", tableName);

checkFilters(
"dep = 'd1' AND salary > 100.03" /* query predicate */,
"isnotnull(salary) AND (salary > 100.03)" /* Spark post scan filter */,
"dep IS NOT NULL, salary IS NOT NULL, dep = 'd1', salary > 100.03" /* Iceberg scan filters */,
ImmutableList.of(row(2, new BigDecimal("100.05"), "d1")));
}

@Test
public void testFilterPushdownWithIdentityTransform() {
sql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.spark.sql.connector.expressions.filter.Not;
import org.apache.spark.sql.connector.expressions.filter.Or;
import org.apache.spark.sql.connector.expressions.filter.Predicate;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.unsafe.types.UTF8String;

public class SparkV2Filters {
Expand Down Expand Up @@ -378,6 +379,8 @@ private static boolean isLiteral(org.apache.spark.sql.connector.expressions.Expr
private static Object convertLiteral(Literal<?> literal) {
if (literal.value() instanceof UTF8String) {
return ((UTF8String) literal.value()).toString();
} else if (literal.value() instanceof Decimal) {
return ((Decimal) literal.value()).toJavaBigDecimal();
}
return literal.value();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
import static org.apache.iceberg.PlanningMode.LOCAL;

import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
Expand Down Expand Up @@ -56,6 +57,25 @@ public void removeTables() {
sql("DROP TABLE IF EXISTS tmp_view");
}

@Test
public void testFilterPushdownWithDecimalValues() {
sql(
"CREATE TABLE %s (id INT, salary DECIMAL(10, 2), dep STRING)"
+ "USING iceberg "
+ "PARTITIONED BY (dep)",
tableName);
configurePlanningMode(planningMode);

sql("INSERT INTO %s VALUES (1, 100.01, 'd1')", tableName);
sql("INSERT INTO %s VALUES (2, 100.05, 'd1')", tableName);

checkFilters(
"dep = 'd1' AND salary > 100.03" /* query predicate */,
"isnotnull(salary) AND (salary > 100.03)" /* Spark post scan filter */,
"dep IS NOT NULL, salary IS NOT NULL, dep = 'd1', salary > 100.03" /* Iceberg scan filters */,
ImmutableList.of(row(2, new BigDecimal("100.05"), "d1")));
}

@Test
public void testFilterPushdownWithIdentityTransform() {
sql(
Expand Down

0 comments on commit 10367c3

Please sign in to comment.