Skip to content

Commit

Permalink
[spark] Supports runtime filter (#2421)
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron authored and JingsongLi committed Dec 1, 2023
1 parent 555ff8f commit 63262b6
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.table.source;

import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.InnerTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Projection;
Expand Down Expand Up @@ -61,7 +62,11 @@ public RowType readType() {

@Override
public ReadBuilder withFilter(Predicate filter) {
this.filter = filter;
if (this.filter == null) {
this.filter = filter;
} else {
this.filter = PredicateBuilder.and(this.filter, filter);
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.paimon.spark

import org.apache.paimon.table.Table
import org.apache.paimon.table.source.ReadBuilder

case class PaimonScan(table: Table, readBuilder: ReadBuilder)
extends PaimonBaseScan(table, readBuilder)

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.paimon.spark

import org.apache.paimon.spark.sources.PaimonMicroBatchStream
import org.apache.paimon.table.{DataTable, Table}
import org.apache.paimon.table.source.{ReadBuilder, Split}

import org.apache.spark.sql.connector.read.{Batch, Scan, Statistics, SupportsReportStatistics}
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream
import org.apache.spark.sql.types.StructType

import java.util.OptionalLong

import scala.collection.JavaConverters._

abstract class PaimonBaseScan(table: Table, readBuilder: ReadBuilder)
extends Scan
with SupportsReportStatistics {

protected var splits: Array[Split] = _

override def description(): String = {
s"paimon(${readBuilder.tableName()})"
}

override def readSchema(): StructType = {
SparkTypeUtils.fromPaimonRowType(readBuilder.readType())
}

override def toBatch: Batch = {
PaimonBatch(getSplits, readBuilder)
}

override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
new PaimonMicroBatchStream(table.asInstanceOf[DataTable], readBuilder, checkpointLocation)
}

override def estimateStatistics(): Statistics = {
val rowCount = getSplits.map(_.rowCount).sum
val scannedTotalSize = rowCount * readSchema().defaultSize

new Statistics {
override def sizeInBytes(): OptionalLong = OptionalLong.of(scannedTotalSize)

override def numRows(): OptionalLong = OptionalLong.of(rowCount)
}
}

private def getSplits: Array[Split] = {
if (splits == null) {
splits = readBuilder.newScan().plan().splits().asScala.toArray
}
splits
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ abstract class PaimonBaseScanBuilder(table: Table)
}

override def build(): Scan = {
new SparkScan(table, getReadBuilder());
new PaimonScan(table, getReadBuilder());
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.paimon.spark

import org.apache.paimon.table.Table
import org.apache.paimon.table.source.ReadBuilder

import org.apache.spark.sql.Utils.fieldReference
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering
import org.apache.spark.sql.sources.{Filter, In}

import scala.collection.JavaConverters._

case class PaimonScan(table: Table, readBuilder: ReadBuilder)
extends PaimonBaseScan(table, readBuilder)
with SupportsRuntimeFiltering {

override def filterAttributes(): Array[NamedReference] = {
val requiredFields = readBuilder.readType().getFieldNames.asScala
table
.partitionKeys()
.asScala
.toArray
.filter(requiredFields.contains)
.map(fieldReference)
}

override def filter(filters: Array[Filter]): Unit = {
val converter = new SparkFilterConverter(table.rowType())
val partitionFilter = filters.flatMap {
case in @ In(attr, _) if table.partitionKeys().contains(attr) =>
Some(converter.convert(in))
case _ => None
}
if (partitionFilter.nonEmpty) {
readBuilder.withFilter(partitionFilter.head)
splits = readBuilder.newScan().plan().splits().asScala.toArray
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql

import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.sources.Filter

Expand Down Expand Up @@ -58,4 +59,8 @@ object Utils {
supportNestedPredicatePushdown: Boolean): Option[Filter] = {
DataSourceStrategy.translateFilter(predicate, supportNestedPredicatePushdown)
}

def fieldReference(name: String): NamedReference = {
FieldReference.column(name)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import org.apache.paimon.table.source.DataSplit
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.catalyst.trees.TreePattern.DYNAMIC_PRUNING_SUBQUERY
import org.apache.spark.sql.connector.read.{ScanBuilder, SupportsPushDownLimit}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.junit.jupiter.api.Assertions

import scala.collection.JavaConverters._

class SparkPushDownTest extends PaimonSparkTestBase {
class PaimonPushDownTest extends PaimonSparkTestBase {

import testImplicits._

test(s"Paimon push down: apply partition filter push down with non-partitioned table") {
spark.sql(s"""
Expand Down Expand Up @@ -129,6 +132,46 @@ class SparkPushDownTest extends PaimonSparkTestBase {
Assertions.assertFalse(scanBuilder.asInstanceOf[SupportsPushDownLimit].pushLimit(1))
}

test("Paimon pushDown: runtime filter") {
withTable("source", "t") {
Seq((1L, "x1", "2023"), (2L, "x2", "2023"), (5L, "x5", "2025"), (6L, "x6", "2026"))
.toDF("a", "b", "pt")
.createOrReplaceTempView("source")

spark.sql("""
|CREATE TABLE t (id INT, name STRING, pt STRING) PARTITIONED BY (pt)
|""".stripMargin)

spark.sql(
"""
|INSERT INTO t VALUES (1, "a", "2023"), (3, "c", "2023"), (5, "e", "2025"), (7, "g", "2027")
|""".stripMargin)

val df1 = spark.sql("""
|SELECT t.id, t.name, source.b FROM source join t
|ON source.pt = t.pt AND source.pt = '2023'
|ORDER BY t.id, source.b
|""".stripMargin)
val qe1 = df1.queryExecution
Assertions.assertFalse(qe1.analyzed.containsPattern(DYNAMIC_PRUNING_SUBQUERY))
Assertions.assertTrue(qe1.optimizedPlan.containsPattern(DYNAMIC_PRUNING_SUBQUERY))
Assertions.assertTrue(qe1.sparkPlan.containsPattern(DYNAMIC_PRUNING_SUBQUERY))
checkAnswer(
df1,
Row(1, "a", "x1") :: Row(1, "a", "x2") :: Row(3, "c", "x1") :: Row(3, "c", "x2") :: Nil)

val df2 = spark.sql("""
|SELECT t.*, source.b FROM source join t
|ON source.a = t.id AND source.pt = t.pt AND source.a > 3
|""".stripMargin)
val qe2 = df1.queryExecution
Assertions.assertFalse(qe2.analyzed.containsPattern(DYNAMIC_PRUNING_SUBQUERY))
Assertions.assertTrue(qe2.optimizedPlan.containsPattern(DYNAMIC_PRUNING_SUBQUERY))
Assertions.assertTrue(qe2.sparkPlan.containsPattern(DYNAMIC_PRUNING_SUBQUERY))
checkAnswer(df2, Row(5, "e", "2025", "x5") :: Nil)
}
}

private def getScanBuilder(tableName: String = "T"): ScanBuilder = {
new SparkTable(loadTable(tableName))
.newScanBuilder(CaseInsensitiveStringMap.empty())
Expand Down

0 comments on commit 63262b6

Please sign in to comment.