-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19085][SQL] cleanup OutputWriterFactory and OutputWriter #16479
Conversation
@@ -64,18 +64,18 @@ object FileFormatWriter extends Logging { | |||
val outputWriterFactory: OutputWriterFactory, | |||
val allColumns: Seq[Attribute], | |||
val partitionColumns: Seq[Attribute], | |||
val nonPartitionColumns: Seq[Attribute], | |||
val dataColumns: Seq[Attribute], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename nonPartitionColumns
to dataColumns
, to be consistent with other places in the codebase.
Test build #70932 has finished for PR 16479 at commit
|
What is the benefit of making these changes? |
@yhuai It removes unnecessary code to make the codebase easier to maintain. Besides, the libsvm relation should be a little faster as it doesn't need to go through a converter. |
Test build #70947 has finished for PR 16479 at commit
|
|
||
protected[sql] def writeInternal(row: InternalRow): Unit = { | ||
write(converter(row)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found the original PR that introduce these lines: https://github.com/apache/spark/pull/8010/files
*/ | ||
def newWriter(path: String): OutputWriter = { | ||
throw new UnsupportedOperationException("newInstance with just path not supported") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The usage of this function was removed in https://github.com/apache/spark/pull/15710/files
I think it is safe to remove it.
override def write(row: Row): Unit = { | ||
val label = row.get(0) | ||
val vector = row.get(1).asInstanceOf[Vector] | ||
// This `asInstanceOf` is safe because it's guaranteed by `LibSVMFileFormat.verifySchema` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LibSVMFileFormat.verifySchema
is only called in the buildReader
, but this is the write path, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok I added the verification.
Test build #70971 has finished for PR 16479 at commit
|
Test build #70973 has finished for PR 16479 at commit
|
LGTM |
thanks for the review, merging to master! |
## What changes were proposed in this pull request? `OutputWriterFactory`/`OutputWriter` are internal interfaces and we can remove some unnecessary APIs: 1. `OutputWriterFactory.newWriter(path: String)`: no one calls it and no one implements it. 2. `OutputWriter.write(row: Row)`: during execution we only call `writeInternal`, which is weird as `OutputWriter` is already an internal interface. We should rename `writeInternal` to `write` and remove `def write(row: Row)` and it's related converter code. All implementations should just implement `def write(row: InternalRow)` ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes apache#16479 from cloud-fan/hive-writer.
how "internal" are these interfaces really? every time a change like this is made spark-avro breaks spark-avro uses |
Everything in package Ideally we should not change any interface if unnecessary, but this change is reasonable. As an internal interface, it's more efficient to use |
i will just copy the conversion code over for now thx |
## What changes were proposed in this pull request? `OutputWriterFactory`/`OutputWriter` are internal interfaces and we can remove some unnecessary APIs: 1. `OutputWriterFactory.newWriter(path: String)`: no one calls it and no one implements it. 2. `OutputWriter.write(row: Row)`: during execution we only call `writeInternal`, which is weird as `OutputWriter` is already an internal interface. We should rename `writeInternal` to `write` and remove `def write(row: Row)` and it's related converter code. All implementations should just implement `def write(row: InternalRow)` ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes apache#16479 from cloud-fan/hive-writer.
So it turns out just copying the conversion code doesn't work, as seen in databricks/spark-avro#240 - and now I'm running into the same thing writing my own datasource. As a datasource in the end requires implementing a class that extends OutputWriter, and the OutputWriter interface changed, a datasource plugin doesn't seem to be able to support both pre and post versions of 2.2.x in the same plugin. Any suggestions on how to handle this, without requiring users to match a the spark version to the new datasource version? |
This is a common issue of the data source v1, it's not powerful enough and you have to use some Spark internal APIs and hit compatibility problem... AFAIK a workable solution is to create different branches for different Spark versions, or using some dirty reflection workarounds. |
I'd be interested in the "dirty reflection workarounds", if you have examples. Not sure how I'd use reflection to handle conflicting interface definitions, but I'd love to learn. |
Here is a better solution I found: https://github.com/databricks/spark-avro/pull/217/files#diff-3086eddba29f4034c324541695a2357b implementing different |
So it essentially compiles each implementation against different spark versions, then both bytecodes are included in the final jar? Then reflection to instantiate it. That works, without too much pain. Might go that route, thanks. |
What changes were proposed in this pull request?
OutputWriterFactory
/OutputWriter
are internal interfaces and we can remove some unnecessary APIs:OutputWriterFactory.newWriter(path: String)
: no one calls it and no one implements it.OutputWriter.write(row: Row)
: during execution we only callwriteInternal
, which is weird asOutputWriter
is already an internal interface. We should renamewriteInternal
towrite
and removedef write(row: Row)
and it's related converter code. All implementations should just implementdef write(row: InternalRow)
How was this patch tested?
existing tests.