-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-14116][SQL] Implements buildReader() for ORC data source #11936
[SPARK-14116][SQL] Implements buildReader() for ORC data source #11936
Conversation
e219295
to
057b6f2
Compare
Test build #54048 has finished for PR 11936 at commit
|
Test build #54169 has finished for PR 11936 at commit
|
@@ -81,10 +81,10 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { | |||
val bucketColumns = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems this variable is never used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea... Didn't remove it since I haven't gone through that part thoroughly.
LGTM |
Test build #54172 has finished for PR 11936 at commit
|
@marmbrus Unfortunately our current strategy for generating Spark partitions doesn't play well with ORC version of import org.apache.spark.sql.types._
// Just creates a random file that is large enough
val df = sqlContext.range(1000000).select(
'id cast StringType as 'a,
'id cast StringType as 'b,
'id cast StringType as 'c
)
val path = "/tmp/large.orc"
df.write.mode("overwrite").orc(path)
sqlContext.sql(s"SET spark.sql.files.maxPartitionBytes=${1024 * 1024}")
sqlContext.sql(s"SET spark.sql.parquet.enableVectorizedReader=false")
sqlContext.read.orc(path).count() // <-- Gives 500,000 instead of 1,000,000 Please refer to discussion here for details. |
Test build #54182 has finished for PR 11936 at commit
|
Just realized the problem while washing dishes... The above comment is a false alarm. The real problem is that I ignored start position and length of the |
// Appends partition values | ||
val fullOutput = dataSchema.toAttributes ++ partitionSchema.toAttributes | ||
val joinedRow = new JoinedRow() | ||
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we move this out of the function? Then we don't need to create them everytime this function is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, we should. Michael also mentioned this in this thread. We can do it in a follow-up PR after finishing other data sources to avoid conflicts.
Test build #54183 has finished for PR 11936 at commit
|
// iterator. | ||
val maybePhysicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf)) | ||
|
||
maybePhysicalSchema.fold(Iterator.empty: Iterator[InternalRow]) { physicalSchema => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel that using a fold at here make the code harder to understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we use a more straightforward way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Test build #54254 has finished for PR 11936 at commit
|
if (files.fileFormat.toString == "TestFileFormat" || | ||
files.fileFormat.isInstanceOf[parquet.DefaultSource]) && | ||
files.fileFormat.isInstanceOf[parquet.DefaultSource] || | ||
files.fileFormat.toString == "ORC") && | ||
files.sqlContext.conf.parquetFileScan => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's rename this conf.
Thanks. I am merging this to master. @liancheng @cloud-fan Let's address https://github.com/apache/spark/pull/11936/files#r57520723 in either of your PR for other formats. |
What changes were proposed in this pull request?
This PR implements
FileFormat.buildReader()
for our ORC data source. It also fixed several minor styling issues related toHadoopFsRelation
planning code path.Note that
OrcNewInputFormat
doesn't rely onOrcNewSplit
for creatingOrcRecordReader
s, plainFileSplit
is just fine. That's why we can simply create the record reader with the help ofOrcNewInputFormat
andFileSplit
.How was this patch tested?
Existing test cases should do the work