Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25894][SQL] Add a ColumnarFileFormat type which returns the column count for a given schema #22905

Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -306,7 +306,15 @@ case class FileSourceScanExec(
withOptPartitionCount
}

withSelectedBucketsCount
val withOptColumnCount = relation.fileFormat match {
case columnar: ColumnarFileFormat =>
val sqlConf = relation.sparkSession.sessionState.conf
val columnCount = columnar.columnCountForSchema(sqlConf, requiredSchema)
withSelectedBucketsCount + ("ColumnCount" -> columnCount.toString)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we know column count from requiredSchema metadata?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can "guess-timate" the physical column count by counting the leaf fields in the ReadSchema metadata value, but the true answer is an implementation issue of the file format. For example, in the implementation of ColumnarFileFormat for Parquet, we convert the the Catalyst schema to the Parquet schema before counting columns. I suppose a similar approach would be required for ORC and other columnar formats.

That being said, this new metadata value isn't really meant to provide new and essential information, per se. Its purpose is to provide easy-to-read, practical information that's useful for quickly validating that schema pruning is working as expected. For example, seeing that a query is reading all 423 columns from a table instead of 15 tells us pretty quickly that schema pruning is not working (unless we really are trying to read the entire table schema). I've found the ReadSchema value to be difficult to read in practice because of its terse syntax, and because its printout is truncated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we only include this info when the columnar reader is on?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something we really should include in the metadata? If the purpose of this is to check if the column pruning works or not, logging should be good enough. Adding a trait for it sounds an overkill for the current status. Let's not add an abstraction just for rough guess that it can be generalised.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logging should be good enough

What's your basis for this assertion?

Also, what kind of logging are you suggesting?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who wants that? If someone wants to put metadata somewhere in the physical plan, let them open a PR and make a case for it

No .. I don't think we should add it only because it's requested once. They look same instances to me. I will have no argument if this one is added and other people request to add others later. We should make it clear why this one should be specifically added. We're not going to add all the information to metadata as requested.

If the purpose of adding it is to check if the pushing down is actually working or not, the logging sounds appropriate for its purpose.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean I really think it's more appropriate to check if something as expected or not by logging.

That's speaking from experience, not conjecture.

I am not underestimating your statement. Let's be very clear why it should be put in metadata over logging. How and why it can be useful over logging? in what cases?

For clarification, the scope of this information is narrower then just checking if the column pruning is working or not since we print out requested columns from Spark side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll reiterate a sample use case:

Consider also the case of the beeline user connecting to a multiuser thriftserver. They are pretty far from the log file, whereas running an 'explain' is right there in the terminal.

This also matters to users planning/debugging queries in a Jupyter notebook, as we have in VideoAmp. The LOE for these users to go to a driver log file is quite high by comparison to inspecting a query plan.

When you refer to logging, which log are you referring to? When would this information be logged? And at what log level?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That basically says logging is useless when to use beeline. I don't think this info is super important to (non-advanced) users.

I mean log4j which is Spark's logging module, and I meant information you're including in the metadata. Maybe info level? or debug level.

logInfo("The number of actual column being pruned is blah blah")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That basically says logging is useless when to use beeline. I don't think this info is super important to (non-advanced) users.

My experience says otherwise, and advanced users use beeline and Jupyter, too.

case _ => withSelectedBucketsCount
}

withOptColumnCount
}

private lazy val inputRDD: RDD[InternalRow] = {
Expand Down
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

/**
* An optional mix-in for columnar [[FileFormat]]s. This trait provides some helpful metadata when
* debugging a physical query plan.
*/
private[sql] trait ColumnarFileFormat {
mallman marked this conversation as resolved.
Show resolved Hide resolved
_: FileFormat =>

/** Returns the number of columns required to satisfy the given schema. */
def columnCountForSchema(conf: SQLConf, schema: StructType): Int
}
Expand Up @@ -55,6 +55,7 @@ import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}

class ParquetFileFormat
extends FileFormat
with ColumnarFileFormat
with DataSourceRegister
with Logging
with Serializable {
Expand All @@ -72,6 +73,12 @@ class ParquetFileFormat

override def equals(other: Any): Boolean = other.isInstanceOf[ParquetFileFormat]

override def columnCountForSchema(conf: SQLConf, schema: StructType): Int = {
val converter = new SparkToParquetSchemaConverter(conf)
val parquetSchema = converter.convert(schema)
parquetSchema.getPaths.size
}

override def prepareWrite(
sparkSession: SparkSession,
job: Job,
Expand Down
Expand Up @@ -217,6 +217,19 @@ class ParquetSchemaPruningSuite
Row("Y.") :: Nil)
}

test("ColumnCount metadata value for pruned query should equal the number of columns read") {
withContacts {
val query = sql("select name.middle from contacts")
val fileSourceScans =
query.queryExecution.executedPlan.collect {
case scan: FileSourceScanExec => scan
}
assert(fileSourceScans.size === 1)
val contactsFileScan = fileSourceScans(0)
assert(contactsFileScan.metadata("ColumnCount") === "1")
}
}

private def testSchemaPruning(testName: String)(testThunk: => Unit) {
test(s"Spark vectorized reader - without partition data column - $testName") {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
Expand Down