-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27384][SQL] File source V2: Prune unnecessary partition columns #24296
Conversation
Test build #104276 has finished for PR 24296 at commit
|
retest this please. |
Test build #104278 has finished for PR 24296 at commit
|
The test failures should be fixed after #24284 is merged. |
88ce4fc
to
d16dbab
Compare
@@ -40,7 +45,23 @@ abstract class FileScan( | |||
protected def partitions: Seq[FilePartition] = { | |||
val selectedPartitions = fileIndex.listFiles(Seq.empty, Seq.empty) | |||
val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) | |||
val partitionAttributes = fileIndex.partitionSchema.toAttributes | |||
val attributeMap = partitionAttributes.map(a => getAttributeName(a) -> a).toMap | |||
val readPartitionAttributes = readPartitionSchema.toAttributes.map { readAttr => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should not create an attribute and just use its name. This can be
readPartitionSchema.map { readField =>
attributeMap.get(normalize(readField.name)).getOrElse ...
}
@@ -52,24 +52,24 @@ import org.apache.spark.util.SerializableConfiguration | |||
case class OrcPartitionReaderFactory( | |||
sqlConf: SQLConf, | |||
broadcastedConf: Broadcast[SerializableConfiguration], | |||
resultSchema: StructType, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's update the parameter doc in the classdoc.
Test build #104312 has finished for PR 24296 at commit
|
retest this please. |
StructType(fields) | ||
} | ||
|
||
// Define as method instead of value, since `requiredSchema` is mutable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but we should not create a set inside loop body. How about
def createRequiredNameset ...
...
val requiredNameSet = createRequiredNameset..
val fields = partitionSchema.fields.filter { field => ... }
LGTM except one comment |
Test build #104313 has finished for PR 24296 at commit
|
Test build #104317 has finished for PR 24296 at commit
|
Test build #104318 has finished for PR 24296 at commit
|
thanks, merging to master! |
What changes were proposed in this pull request?
When scanning file sources, we can prune unnecessary partition columns on constructing input partitions, so that:
How was this patch tested?
Existing unit tests.