Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,60 @@ case class PaimonScan(
override val pushDownLimit: Option[Int],
override val pushDownTopN: Option[TopN],
bucketedScanDisabled: Boolean = false)
extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters, pushDownLimit)
with SupportsRuntimeV2Filtering
with SupportsReportPartitioning
with SupportsReportOrdering {

extends PaimonScanCommon(
table,
requiredSchema,
filters,
reservedFilters,
pushDownLimit,
pushDownTopN,
bucketedScanDisabled)
with SupportsRuntimeV2Filtering {
def disableBucketedScan(): PaimonScan = {
copy(bucketedScanDisabled = true)
}

// Since Spark 3.2
override def filterAttributes(): Array[NamedReference] = {
val requiredFields = readBuilder.readType().getFieldNames.asScala
table
.partitionKeys()
.asScala
.toArray
.filter(requiredFields.contains)
.map(fieldReference)
}

override def filter(predicates: Array[SparkPredicate]): Unit = {
val converter = SparkV2FilterConverter(table.rowType())
val partitionKeys = table.partitionKeys().asScala.toSeq
val partitionFilter = predicates.flatMap {
case p
if SparkV2FilterConverter(table.rowType()).isSupportedRuntimeFilter(p, partitionKeys) =>
converter.convert(p)
case _ => None
}
if (partitionFilter.nonEmpty) {
readBuilder.withFilter(partitionFilter.toList.asJava)
// set inputPartitions null to trigger to get the new splits.
inputPartitions = null
inputSplits = null
}
}
}

abstract class PaimonScanCommon(
table: InnerTable,
requiredSchema: StructType,
filters: Seq[Predicate],
reservedFilters: Seq[Filter],
override val pushDownLimit: Option[Int],
override val pushDownTopN: Option[TopN],
bucketedScanDisabled: Boolean = false)
extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters, pushDownLimit)
with SupportsReportPartitioning
with SupportsReportOrdering {

@transient
private lazy val extractBucketTransform: Option[Transform] = {
table match {
Expand Down Expand Up @@ -175,32 +220,4 @@ case class PaimonScan(
}
.toSeq
}

// Since Spark 3.2
override def filterAttributes(): Array[NamedReference] = {
val requiredFields = readBuilder.readType().getFieldNames.asScala
table
.partitionKeys()
.asScala
.toArray
.filter(requiredFields.contains)
.map(fieldReference)
}

override def filter(predicates: Array[SparkPredicate]): Unit = {
val converter = SparkV2FilterConverter(table.rowType())
val partitionKeys = table.partitionKeys().asScala.toSeq
val partitionFilter = predicates.flatMap {
case p
if SparkV2FilterConverter(table.rowType()).isSupportedRuntimeFilter(p, partitionKeys) =>
converter.convert(p)
case _ => None
}
if (partitionFilter.nonEmpty) {
readBuilder.withFilter(partitionFilter.toList.asJava)
// set inputPartitions null to trigger to get the new splits.
inputPartitions = null
inputSplits = null
}
}
}