-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-23325: Use InternalRow when reading with DataSourceV2. #21118
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,7 @@ | |
import java.util.List; | ||
|
||
import org.apache.spark.annotation.InterfaceStability; | ||
import org.apache.spark.sql.Row; | ||
import org.apache.spark.sql.catalyst.InternalRow; | ||
import org.apache.spark.sql.sources.v2.DataSourceOptions; | ||
import org.apache.spark.sql.sources.v2.ReadSupport; | ||
import org.apache.spark.sql.sources.v2.ReadSupportWithSchema; | ||
|
@@ -43,7 +43,7 @@ | |
* Names of these interfaces start with `SupportsScan`. Note that a reader should only | ||
* implement at most one of the special scans, if more than one special scans are implemented, | ||
* only one of them would be respected, according to the priority list from high to low: | ||
* {@link SupportsScanColumnarBatch}, {@link SupportsScanUnsafeRow}. | ||
* {@link SupportsScanColumnarBatch}, {@link SupportsDeprecatedScanRow}. | ||
* | ||
* If an exception was throw when applying any of these query optimizations, the action will fail | ||
* and no Spark job will be submitted. | ||
|
@@ -76,5 +76,5 @@ public interface DataSourceReader { | |
* If this method fails (by throwing an exception), the action will fail and no Spark job will be | ||
* submitted. | ||
*/ | ||
List<InputPartition<Row>> planInputPartitions(); | ||
List<InputPartition<InternalRow>> planInputPartitions(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am sorry for a question in a old PR like this and I think this might not be directly related with this PR. but please allow me ask a question here. Does this mean developers should produce There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The rationale is, data source v2 is not stable yet, and we should make it usable first, to make more people implement data sources and provide feedback. Eventually we should design a stable and efficient row builder in data source v2, but for now we should switch to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, okie. thanks! |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -125,16 +125,13 @@ object DataSourceV2Strategy extends Strategy { | |
val filterCondition = postScanFilters.reduceLeftOption(And) | ||
val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan) | ||
|
||
val withProjection = if (withFilter.output != project) { | ||
ProjectExec(project, withFilter) | ||
} else { | ||
withFilter | ||
} | ||
|
||
withProjection :: Nil | ||
// always add the projection, which will produce unsafe rows required by some operators | ||
ProjectExec(project, withFilter) :: Nil | ||
|
||
case r: StreamingDataSourceV2Relation => | ||
DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil | ||
// ensure there is a projection, which will produce unsafe rows required by some operators | ||
ProjectExec(r.output, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. continuous streaming scan always return unsafe rows, will we introduce regression here? cc @jose-torres There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For now I think it's safer to still require There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Continuous processing will still be experimental in the 2.4 release, so I'm not tremendously concerned about this. We should eventually change the scan to produce rows in whatever way is most efficient in the final API. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is perfectly fine for sources to produce UnsafeRow because it is an InternalRow. I think it is important for us to get to InternalRow in this release. UnsafeRow is too hard to produce and the easiest thing to do is to produce InternalRow and then call into Spark's UnsafeProjection to produce UnsafeRow. That's painful, uses internal APIs, and is slower. |
||
DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader)) :: Nil | ||
|
||
case WriteToDataSourceV2(writer, query) => | ||
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we get rid of
KafkaRecordToUnsafeRowConverter
? Since Spark would do a unsafe projection at the end, here we should just returnGenericInternalRow
instead ofUnsafeRow
, to save data cpoy.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can, but this is intended to make minimal changes. We can add optimizations like this in a follow-up.