Skip to content

Commit

Permalink
[SPARK-36645][SQL][FOLLOWUP] Disable min/max push down for Parquet Bi…
Browse files Browse the repository at this point in the history
…nary

### What changes were proposed in this pull request?
Disable min/max push down for Parquet Binary

### Why are the changes needed?
Parquet Binary min/max could be truncated. We may get wrong result if we rely on parquet Binary min/max.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
modify existing tests

Closes #34346 from huaxingao/disableBinary.

Authored-by: Huaxin Gao <huaxin_gao@apple.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
huaxingao authored and HyukjinKwon committed Oct 22, 2021
1 parent dc60791 commit 724d6a8
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, Spark
import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{ArrayType, LongType, MapType, StructField, StructType, TimestampType}
import org.apache.spark.sql.types.{BooleanType, ByteType, DateType, DoubleType, FloatType, IntegerType, LongType, ShortType, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

case class ParquetScanBuilder(
Expand Down Expand Up @@ -114,11 +114,15 @@ case class ParquetScanBuilder(
// not push down complex type
// not push down Timestamp because INT96 sort order is undefined,
// Parquet doesn't return statistics for INT96
case StructType(_) | ArrayType(_, _) | MapType(_, _, _) | TimestampType =>
false
case _ =>
// not push down Parquet Binary because min/max could be truncated
// (https://issues.apache.org/jira/browse/PARQUET-1685), Parquet Binary
// could be Spark StringType, BinaryType or DecimalType
case BooleanType | ByteType | ShortType | IntegerType
| LongType | FloatType | DoubleType | DateType =>
finalSchema = finalSchema.add(structField.copy(s"$aggType(" + structField.name + ")"))
true
case _ =>
false
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,89 +361,86 @@ abstract class ParquetAggregatePushDownSuite
withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true",
vectorizedReaderEnabledKey -> testVectorizedReader) {

val testMinWithTS = sql("SELECT min(StringCol), min(BooleanCol), min(ByteCol), " +
val testMinWithAllTypes = sql("SELECT min(StringCol), min(BooleanCol), min(ByteCol), " +
"min(BinaryCol), min(ShortCol), min(IntegerCol), min(LongCol), min(FloatCol), " +
"min(DoubleCol), min(DecimalCol), min(DateCol), min(TimestampCol) FROM test")

// INT96 (Timestamp) sort order is undefined, parquet doesn't return stats for this type
// so aggregates are not pushed down
testMinWithTS.queryExecution.optimizedPlan.collect {
// In addition, Parquet Binary min/max could be truncated, so we disable aggregate
// push down for Parquet Binary (could be Spark StringType, BinaryType or DecimalType)
testMinWithAllTypes.queryExecution.optimizedPlan.collect {
case _: DataSourceV2ScanRelation =>
val expected_plan_fragment =
"PushedAggregation: []"
checkKeywordsExistsInExplain(testMinWithTS, expected_plan_fragment)
checkKeywordsExistsInExplain(testMinWithAllTypes, expected_plan_fragment)
}

checkAnswer(testMinWithTS, Seq(Row("a string", false, 1.toByte, "Parquet".getBytes,
2.toShort, 3, -9223372036854775808L, 0.15.toFloat, 0.75D, 1.23457,
("2004-06-19").date, ("1999-08-26 10:43:59.123").ts)))
checkAnswer(testMinWithAllTypes, Seq(Row("a string", false, 1.toByte,
"Parquet".getBytes, 2.toShort, 3, -9223372036854775808L, 0.15.toFloat, 0.75D,
1.23457, ("2004-06-19").date, ("1999-08-26 10:43:59.123").ts)))

val testMinWithOutTS = sql("SELECT min(StringCol), min(BooleanCol), min(ByteCol), " +
"min(BinaryCol), min(ShortCol), min(IntegerCol), min(LongCol), min(FloatCol), " +
"min(DoubleCol), min(DecimalCol), min(DateCol) FROM test")
val testMinWithOutTSAndBinary = sql("SELECT min(BooleanCol), min(ByteCol), " +
"min(ShortCol), min(IntegerCol), min(LongCol), min(FloatCol), " +
"min(DoubleCol), min(DateCol) FROM test")

testMinWithOutTS.queryExecution.optimizedPlan.collect {
testMinWithOutTSAndBinary.queryExecution.optimizedPlan.collect {
case _: DataSourceV2ScanRelation =>
val expected_plan_fragment =
"PushedAggregation: [MIN(StringCol), " +
"MIN(BooleanCol), " +
"PushedAggregation: [MIN(BooleanCol), " +
"MIN(ByteCol), " +
"MIN(BinaryCol), " +
"MIN(ShortCol), " +
"MIN(IntegerCol), " +
"MIN(LongCol), " +
"MIN(FloatCol), " +
"MIN(DoubleCol), " +
"MIN(DecimalCol), " +
"MIN(DateCol)]"
checkKeywordsExistsInExplain(testMinWithOutTS, expected_plan_fragment)
checkKeywordsExistsInExplain(testMinWithOutTSAndBinary, expected_plan_fragment)
}

checkAnswer(testMinWithOutTS, Seq(Row("a string", false, 1.toByte, "Parquet".getBytes,
2.toShort, 3, -9223372036854775808L, 0.15.toFloat, 0.75D, 1.23457,
("2004-06-19").date)))
checkAnswer(testMinWithOutTSAndBinary, Seq(Row(false, 1.toByte,
2.toShort, 3, -9223372036854775808L, 0.15.toFloat, 0.75D, ("2004-06-19").date)))

val testMaxWithTS = sql("SELECT max(StringCol), max(BooleanCol), max(ByteCol), " +
"max(BinaryCol), max(ShortCol), max(IntegerCol), max(LongCol), max(FloatCol), " +
"max(DoubleCol), max(DecimalCol), max(DateCol), max(TimestampCol) FROM test")
val testMaxWithAllTypes = sql("SELECT max(StringCol), max(BooleanCol), " +
"max(ByteCol), max(BinaryCol), max(ShortCol), max(IntegerCol), max(LongCol), " +
"max(FloatCol), max(DoubleCol), max(DecimalCol), max(DateCol), max(TimestampCol) " +
"FROM test")

// INT96 (Timestamp) sort order is undefined, parquet doesn't return stats for this type
// so aggregates are not pushed down
testMaxWithTS.queryExecution.optimizedPlan.collect {
// In addition, Parquet Binary min/max could be truncated, so we disable aggregate
// push down for Parquet Binary (could be Spark StringType, BinaryType or DecimalType)
testMaxWithAllTypes.queryExecution.optimizedPlan.collect {
case _: DataSourceV2ScanRelation =>
val expected_plan_fragment =
"PushedAggregation: []"
checkKeywordsExistsInExplain(testMaxWithTS, expected_plan_fragment)
checkKeywordsExistsInExplain(testMaxWithAllTypes, expected_plan_fragment)
}

checkAnswer(testMaxWithTS, Seq(Row("test string", true, 16.toByte,
checkAnswer(testMaxWithAllTypes, Seq(Row("test string", true, 16.toByte,
"Spark SQL".getBytes, 222.toShort, 113, 9223372036854775807L, 0.25.toFloat, 0.85D,
12345.678, ("2021-01-01").date, ("2021-01-01 23:50:59.123").ts)))

val testMaxWithoutTS = sql("SELECT max(StringCol), max(BooleanCol), max(ByteCol), " +
"max(BinaryCol), max(ShortCol), max(IntegerCol), max(LongCol), max(FloatCol), " +
"max(DoubleCol), max(DecimalCol), max(DateCol) FROM test")
val testMaxWithoutTSAndBinary = sql("SELECT max(BooleanCol), max(ByteCol), " +
"max(ShortCol), max(IntegerCol), max(LongCol), max(FloatCol), " +
"max(DoubleCol), max(DateCol) FROM test")

testMaxWithoutTS.queryExecution.optimizedPlan.collect {
testMaxWithoutTSAndBinary.queryExecution.optimizedPlan.collect {
case _: DataSourceV2ScanRelation =>
val expected_plan_fragment =
"PushedAggregation: [MAX(StringCol), " +
"MAX(BooleanCol), " +
"PushedAggregation: [MAX(BooleanCol), " +
"MAX(ByteCol), " +
"MAX(BinaryCol), " +
"MAX(ShortCol), " +
"MAX(IntegerCol), " +
"MAX(LongCol), " +
"MAX(FloatCol), " +
"MAX(DoubleCol), " +
"MAX(DecimalCol), " +
"MAX(DateCol)]"
checkKeywordsExistsInExplain(testMaxWithoutTS, expected_plan_fragment)
checkKeywordsExistsInExplain(testMaxWithoutTSAndBinary, expected_plan_fragment)
}

checkAnswer(testMaxWithoutTS, Seq(Row("test string", true, 16.toByte,
"Spark SQL".getBytes, 222.toShort, 113, 9223372036854775807L, 0.25.toFloat, 0.85D,
12345.678, ("2021-01-01").date)))
checkAnswer(testMaxWithoutTSAndBinary, Seq(Row(true, 16.toByte,
222.toShort, 113, 9223372036854775807L, 0.25.toFloat, 0.85D, ("2021-01-01").date)))

val testCount = sql("SELECT count(StringCol), count(BooleanCol)," +
" count(ByteCol), count(BinaryCol), count(ShortCol), count(IntegerCol)," +
Expand Down

0 comments on commit 724d6a8

Please sign in to comment.