Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-23325: Use InternalRow when reading with DataSourceV2. #21118

Closed

Conversation

rdblue
Copy link
Contributor

@rdblue rdblue commented Apr 20, 2018

What changes were proposed in this pull request?

This updates the DataSourceV2 API to use InternalRow instead of Row for the default case with no scan mix-ins.

Support for readers that produce Row is added through SupportsDeprecatedScanRow, which matches the previous API. Readers that used Row now implement this class and should be migrated to InternalRow.

Readers that previously implemented SupportsScanUnsafeRow have been migrated to use no SupportsScan mix-ins and produce InternalRow.

How was this patch tested?

This uses existing tests.

@SparkQA
Copy link

SparkQA commented Apr 20, 2018

Test build #89661 has finished for PR 21118 at commit eddd049.

  • This patch fails RAT tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@rdblue rdblue force-pushed the SPARK-23325-datasource-v2-internal-row branch from eddd049 to 72f3c1a Compare April 20, 2018 20:51
@rdblue
Copy link
Contributor Author

rdblue commented Apr 20, 2018

@jose-torres, @cloud-fan, can you take a look at this? It updates the v2 API to use InternalRow by default in the read path. I'll follow up with a patch for the write path, or we can include it here if you'd prefer.

@SparkQA
Copy link

SparkQA commented Apr 20, 2018

Test build #89662 has finished for PR 21118 at commit 72f3c1a.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 20, 2018

Test build #89663 has finished for PR 21118 at commit 2115529.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jose-torres
Copy link
Contributor

Generally looks good.

IIRC, there's some arcane reason why plan nodes need to produce UnsafeRow even though SparkPlan.execute() declares InternalRow. So we may need to add a projection in DataSourceV2ScanExec.

@rdblue
Copy link
Contributor Author

rdblue commented Apr 20, 2018

Yeah, we should probably add a projection. It's probably only working because the InternalRows that are produced are all UnsafeRow.

@SparkQA
Copy link

SparkQA commented Apr 20, 2018

Test build #89665 has finished for PR 21118 at commit 6006123.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

@rdblue . Could you fix the remaining KafkaMicroBatchSourceSuite.scala, too?

[error] /home/jenkins/workspace/SparkPullRequestBuilder@2/external/kafka-0-10-sql/src/test/scala/
org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:676:
value createUnsafeRowReaderFactories is not a member of org.apache.spark.sql.kafka010.KafkaMicroBatchReader
[error]         val factories = reader.createUnsafeRowReaderFactories().asScala
[error]                    

* changed in the future Spark versions.
*/
@InterfaceStability.Unstable
public interface SupportsScanUnsafeRow extends DataSourceReader {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need this trait.

In Spark SQL there is a contract that all operators produce UnsafeRow, except some Dataset related operators. We have operators assuming the input rows are UnsafeRow and do type cast, e.g. operators which use GenerateUnsafeRowJoiner.

That is to say, the data source scan has to do an unsafe projection to make sure it produces unsafe rows. This will be a waste if the data source already produces unsafe rows.

For file-based data source, we solve this issue by adding a flag needsUnsafeRowConversion to decide if we need the unsafe projection or not. Another solution is what @rdblue proposed in the dev list discussion: do isInstanOf[UnsafeRow] check for each input row and skip unsafe projection if it's already unsafe row. That may have a performance penalty because of the per-row check.

So this trait is still useful, at least for the built-in file-based data sources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check I was referring to is implemented in generated code. The projection added in DataSourceV2Strategy handles the cases where part or all of the incoming row is UnsafeRow.

@@ -86,7 +87,7 @@ class KafkaContinuousReader(
KafkaSourceOffset(JsonUtils.partitionOffsets(json))
}

override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
override def createReadTasks(): ju.List[ReadTask[InternalRow]] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please do the renaming in an individual PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved this to #21145. I'll rebase this PR on that one, so lets try to get that in first.

@rdblue rdblue force-pushed the SPARK-23325-datasource-v2-internal-row branch from 6006123 to 16f1b6e Compare April 25, 2018 23:06
@SparkQA
Copy link

SparkQA commented Apr 26, 2018

Test build #89863 has finished for PR 21118 at commit 16f1b6e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor Author

rdblue commented May 2, 2018

@cloud-fan and @jose-torres: I looked at explain codegen for reading from a Parquet table (with vectorized reads disabled) and it doesn't look like there is a dependency on UnsafeRow:

explain codegen select * from test
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*FileScan parquet rblue.test[id#40L,data#41] Batched: false, Format: Parquet, Location: InMemoryFileIndex[s3://bucket/warehouse/blue.db/test/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,data:string>

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows;
/* 009 */   private scala.collection.Iterator scan_input;
/* 010 */
/* 011 */   public GeneratedIterator(Object[] references) {
/* 012 */     this.references = references;
/* 013 */   }
/* 014 */
/* 015 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 016 */     partitionIndex = index;
/* 017 */     this.inputs = inputs;
/* 018 */     this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 019 */     scan_input = inputs[0];
/* 020 */
/* 021 */   }
/* 022 */
/* 023 */   protected void processNext() throws java.io.IOException {
/* 024 */     while (scan_input.hasNext()) {
/* 025 */       InternalRow scan_row = (InternalRow) scan_input.next();
/* 026 */       scan_numOutputRows.add(1);
/* 027 */       append(scan_row);
/* 028 */       if (shouldStop()) return;
/* 029 */     }
/* 030 */   }
/* 031 */ }

I've looked at a few simple queries with filters, projects, and aggregation and it doesn't look like any of the generated code depends on UnsafeRow. Can anyone confirm that it is not a requirement to pass UnsafeRow into generated code?

If there is no requirement for the rows to be UnsafeRow, then is it necessary to add an UnsafeProjection or would the copy to unsafe make execution slower?

If the rows passed from the data source are UnsafeRow, then UnsafeProjection detects it and copies the row buffer (see examples). That's faster than copying individual values, but slower than just using the row as-is. Not adding a projection would make this case faster.

If the rows passed from the data source are InternalRow and not UnsafeRow, then we could copy them immediately, but it is very likely that the data is already going to be copied. A projection, for example, immediately copies all of the data out of the UnsafeRow and an initial copy to unsafe is just extra work. Similarly, a filter is probably selective enough that it makes sense to wait until after the filter runs to copy the entire row of data to unsafe.

In all of the cases that I've looked at, a copy to unsafe would only slow down execution. Unsafe rows may have better cache locality, but the copy reads all of the data anyway.

If I'm right and we do not need to insert a copy to UnsafeRow, then we don't need the SupportsScanUnsafeRow trait. Data sources can still produce UnsafeRow and it would work without a problem. The only time we need to know that an InternalRow is actually an UnsafeRow is if we are adding a projection to unsafe, in which case we could avoid a copy of the row buffer.

@cloud-fan
Copy link
Contributor

parquet scan doesn't need unsafe row because it outputs ColumnarBatch. Note that, UnsafeRow is the data format Spark uses to exchange data between operators, but whole-stage-codegen can merge several operators into one. So in your case, a parquet scan followed with some simple operators like filter, project, is still one operator, so you won't see UnsafeRow.

I believe if you add aggregate, you will see UnsafeRow in the final aggregate's generated code, which comes from the ShuffleExchangeExec operator.

@rdblue
Copy link
Contributor Author

rdblue commented May 3, 2018

@cloud-fan, actually I tried a lot of different queries yesterday, including joins and aggregations with vectorized reads turned off. The only thing that didn't work was collect for a select * from t because SparkPlan assumes that the rows will be unsafe.

I'm planning to do more testing, but I don't see anything that requires UnsafeRow in generated code. @marmbrus, what was the original intent in codegen? Should codegen use InternalRow or UnsafeRow?

@cloud-fan
Copy link
Contributor

all the places that use GenerateUnsafeRowJoiner assume the input row is unsafe row.

ShuffleExchangeExec assumes its input is unsafe row, because its serializer is UnsafeRowSerializer.

Note that we don't enforce this at the API level, i.e. SparkPlan.execute still returns RDD[InternalRow]. This is because we have exceptions: the object related operators can return safe row, and object related operators always appear in a group, and the last operator will output unsafe row.

That said, you may not be able to see UnsafeRow in the generated code, but you will get ClassCastException if you don't follow this rule and output safe row.

@rdblue
Copy link
Contributor Author

rdblue commented May 4, 2018

@cloud-fan, let me clarify what I'm getting at here.

It appears that Spark makes at least one copy of data to unsafe when reading any Parquet row. If the projection includes partition columns, then Spark makes a second copy to unsafe. Two copies of every row read from Parquet is a fairly common occurrence, even if the plan doesn't need the data to be unsafe.

Most of the operators I've been looking at -- including codegen operators -- support InternalRow. If we can get rid of two copies of every row, then we should look into it. It is even more important, if this causes less headache for implementers of the V2 API. If it doesn't matter whether an implementation produces InternalRow or UnsafeRow to Spark internals, then we don't need an extra trait, SupportsScanUnsafeRow, or any code to handle it in Spark. Simpler APIs are a good thing.

What we need to find out is:

  1. Is there a strong expectation (i.e., a design requirement) that data sources produce UnsafeRow? (This is why I'm asking @marmbrus, but I'm not sure who the best person to ask is.)
  2. How many places actually do depend on UnsafeRow and not InternalRow? I've found one and you pointed out another. But we could update the joiner fairly easily and can probably update other places, too.

This is something we should look into now because it has the potential to be a good speed improvement for queries that use nested data (and can't use the vectorized read path). In addition, it will only get harder to remove needless dependence on UnsafeRow later.

@rdblue
Copy link
Contributor Author

rdblue commented May 4, 2018

I just did a performance test based on our 2.1.1 and a real table. I tested a full scan of an hour of data with a single data filter.

The scan had 13,083 tasks and read 1084.8 GB. I used identical Spark applications with 100 executors, each with 1 core and 6 GB memory.

  • With project to UnsafeRow: wall time: 12m, total task time: 19h, longest task: 51s.
  • Without projection, using InternalRow: wall time: 11m, total task time: 17.8h, longest task: 26s.

Clearly, this is not a benchmark. But this shows a 6% performance improvement for not making unnecessary copies. Eliminating copies is a pretty easy way to get better performance, if we can update a few operators to work with both InternalRow and UnsafeRow.

@cloud-fan
Copy link
Contributor

@rdblue , this is a good point. Since not all the operators need unsasfe row, we can save the copy at data source side if we don't need to produce unsade row. Actually we had such a mechanism before: #10511

But I'm not sure if it worth to bring it back. We expect data source to produce ColumnarBatch for better performance, and the row interface performance is not that important. Actually the SupportsScanUnsafeRow is only there to avoid perf regression for migrating file sources. If you think that's not a good public API, we can move it to internal package and only use it for file sources.

@rdblue
Copy link
Contributor Author

rdblue commented May 7, 2018

Actually the SupportsScanUnsafeRow is only there to avoid perf regression for migrating file sources. If you think that's not a good public API, we can move it to internal package and only use it for file sources.

I don't think it is a good idea to introduce additions for file sources. Part of the motivation for the v2 API is to get rid of those. Besides, I don't think we need it if we handle conversion in Spark instead of in the data sources.

I think we should update the physical plan and push both filters and projections into the v2 scan node. Then data sources won't need to produce UnsafeRow but we can guarantee that the scan node produces UnsafeRow, which it would already do in most cases because it includes a projection. I'll open a PR for this, or I can include the change here if you prefer.

@rdblue
Copy link
Contributor Author

rdblue commented May 7, 2018

We expect data source to produce ColumnarBatch for better performance, and the row interface performance is not that important.

I disagree. The vectorized path isn't used for all Parquet table scans and we should continue to care about its performance.

@rdblue
Copy link
Contributor Author

rdblue commented May 8, 2018

@cloud-fan: This PR is also related to #21262 because that PR updates the conversion from logical to physical plan and handles projections and filtering. We could modify that strategy to always ensure that there is a ProjectExec wrapping the scan, which will convert to unsafe. If we did that, then only ProjectExec and FilterExec would need to support InternalRow, which they already do.

@cloud-fan
Copy link
Contributor

If we did that, then only ProjectExec and FilterExec would need to support InternalRow, which they already do.

This partially brings #10511 back, and we need to plan project and filter with data source scan together to make sure unsafe row is produced at the end.

If we want to go this way, I think we should fully bring back #10511 to make this contract explicitly, i.e. which operator produce unsafe row and which operator only accepts unsafe row as input. After this, file sources do not need SupportsScanUnsafeRow to avoid perf regression.

On the other hand, do you think it's possible that a data source can directly produce unsafe row? e.g. it copies its data to unsafe row directly, without an intermedia InternalRow. The point is, SupportsScanUnsafeRow is very easy to maintain, compared to SupportsScanColumnarBatch.

@rdblue
Copy link
Contributor Author

rdblue commented May 8, 2018

If we want to go this way, I think we should fully bring back #10511 to make this contract explicitly, i.e. which operator produce unsafe row and which operator only accepts unsafe row as input.

Whether internal operators should expect UnsafeRow and fail on InternalRow is a design question that we should discuss on the dev list. I'll start a thread.

The solution I'm suggesting would ensure that sources can produce InternalRow and Spark will convert to unsafe if it is required by the plan (there isn't already a projection). For now, since we know there are places that fail, we would make sure there is a projection before any nodes other than filter.

This would be better than having the data sources handle conversion to unsafe because it avoids extra copies when Spark needs to project anyway and moves the conversion after any filters on top of the scan.

@rdblue rdblue force-pushed the SPARK-23325-datasource-v2-internal-row branch from 16f1b6e to b5d2c9f Compare May 10, 2018 21:15
@SparkQA
Copy link

SparkQA commented May 10, 2018

Test build #90478 has finished for PR 21118 at commit b5d2c9f.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 10, 2018

Test build #90480 has finished for PR 21118 at commit 07dc4fc.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor Author

rdblue commented Jul 20, 2018

Rebased on master to fix conflicts.

@SparkQA
Copy link

SparkQA commented Jul 20, 2018

Test build #93365 has finished for PR 21118 at commit d1fa32e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@felixcheung
Copy link
Member

so where are we on this? looks like we have 2 LGTM?

@rdblue
Copy link
Contributor Author

rdblue commented Jul 23, 2018

@cloud-fan, any update on merging this?

@cloud-fan
Copy link
Contributor

retest this please

@cloud-fan
Copy link
Contributor

LGTM, let's merge it when the tests pass (the last pass was 4 days ago)

@SparkQA
Copy link

SparkQA commented Jul 24, 2018

Test build #93500 has finished for PR 21118 at commit d1fa32e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

Thanks! Merged to master.

@asfgit asfgit closed this in 9d27541 Jul 24, 2018
@rdblue
Copy link
Contributor Author

rdblue commented Jul 24, 2018

Thanks for reviewing and merging @cloud-fan, @gatorsmile, @felixcheung!

cloud-fan added a commit to cloud-fan/spark that referenced this pull request Aug 1, 2018
## What changes were proposed in this pull request?

This is a follow up of apache#21118 .

In apache#21118 we added `SupportsDeprecatedScanRow`. Ideally data source should produce `InternalRow` instead of `Row` for better performance. We should remove `SupportsDeprecatedScanRow` and encourage data sources to produce `InternalRow`, which is also very easy to build.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21921 from cloud-fan/row.
asfgit pushed a commit that referenced this pull request Aug 6, 2018
## What changes were proposed in this pull request?

A follow up of #21118

Since we use `InternalRow` in the read API of data source v2, we should do the same thing for the write API.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21948 from cloud-fan/row-write.
@@ -76,5 +76,5 @@
* If this method fails (by throwing an exception), the action will fail and no Spark job will be
* submitted.
*/
List<InputPartition<Row>> planInputPartitions();
List<InputPartition<InternalRow>> planInputPartitions();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry for a question in a old PR like this and I think this might not be directly related with this PR. but please allow me ask a question here. Does this mean developers should produce InternalRow here for each partition? InternalRow is under catalyst and not meant to be exposed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rationale is, data source v2 is not stable yet, and we should make it usable first, to make more people implement data sources and provide feedback. Eventually we should design a stable and efficient row builder in data source v2, but for now we should switch to InternalRow to make it usable. Row is too slow to implement a decent data source (like iceberg).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, okie. thanks!

jzhuge pushed a commit to jzhuge/spark that referenced this pull request Mar 7, 2019
This updates the DataSourceV2 API to use InternalRow instead of Row for the default case with no scan mix-ins.

Support for readers that produce Row is added through SupportsDeprecatedScanRow, which matches the previous API. Readers that used Row now implement this class and should be migrated to InternalRow.

Readers that previously implemented SupportsScanUnsafeRow have been migrated to use no SupportsScan mix-ins and produce InternalRow.

This uses existing tests.

Author: Ryan Blue <blue@apache.org>

Closes apache#21118 from rdblue/SPARK-23325-datasource-v2-internal-row.

(cherry picked from commit 9d27541)

Conflicts:
	external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
	external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
	external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
	sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
	sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
	sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Mar 7, 2019
This is a follow up of apache#21118 .

In apache#21118 we added `SupportsDeprecatedScanRow`. Ideally data source should produce `InternalRow` instead of `Row` for better performance. We should remove `SupportsDeprecatedScanRow` and encourage data sources to produce `InternalRow`, which is also very easy to build.

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21921 from cloud-fan/row.

(cherry picked from commit defc54c)

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
	sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
	sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Mar 7, 2019
A follow up of apache#21118

Since we use `InternalRow` in the read API of data source v2, we should do the same thing for the write API.

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21948 from cloud-fan/row-write.

(cherry picked from commit ac527b5)

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterProvider.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
	sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
	sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
rdblue added a commit to rdblue/spark that referenced this pull request Apr 3, 2019
This updates the DataSourceV2 API to use InternalRow instead of Row for the default case with no scan mix-ins.

Support for readers that produce Row is added through SupportsDeprecatedScanRow, which matches the previous API. Readers that used Row now implement this class and should be migrated to InternalRow.

Readers that previously implemented SupportsScanUnsafeRow have been migrated to use no SupportsScan mix-ins and produce InternalRow.

This uses existing tests.

Author: Ryan Blue <blue@apache.org>

Closes apache#21118 from rdblue/SPARK-23325-datasource-v2-internal-row.
rdblue pushed a commit to rdblue/spark that referenced this pull request Apr 3, 2019
This is a follow up of apache#21118 .

In apache#21118 we added `SupportsDeprecatedScanRow`. Ideally data source should produce `InternalRow` instead of `Row` for better performance. We should remove `SupportsDeprecatedScanRow` and encourage data sources to produce `InternalRow`, which is also very easy to build.

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21921 from cloud-fan/row.
rdblue pushed a commit to rdblue/spark that referenced this pull request Apr 3, 2019
A follow up of apache#21118

Since we use `InternalRow` in the read API of data source v2, we should do the same thing for the write API.

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21948 from cloud-fan/row-write.
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Oct 15, 2019
This updates the DataSourceV2 API to use InternalRow instead of Row for the default case with no scan mix-ins.

Support for readers that produce Row is added through SupportsDeprecatedScanRow, which matches the previous API. Readers that used Row now implement this class and should be migrated to InternalRow.

Readers that previously implemented SupportsScanUnsafeRow have been migrated to use no SupportsScan mix-ins and produce InternalRow.

This uses existing tests.

Author: Ryan Blue <blue@apache.org>

Closes apache#21118 from rdblue/SPARK-23325-datasource-v2-internal-row.

(cherry picked from commit 9d27541)

Conflicts:
	external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
	external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
	external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
	sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
	sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
	sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Oct 15, 2019
This is a follow up of apache#21118 .

In apache#21118 we added `SupportsDeprecatedScanRow`. Ideally data source should produce `InternalRow` instead of `Row` for better performance. We should remove `SupportsDeprecatedScanRow` and encourage data sources to produce `InternalRow`, which is also very easy to build.

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21921 from cloud-fan/row.

(cherry picked from commit defc54c)

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
	sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
	sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Oct 15, 2019
A follow up of apache#21118

Since we use `InternalRow` in the read API of data source v2, we should do the same thing for the write API.

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21948 from cloud-fan/row-write.

(cherry picked from commit ac527b5)

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterProvider.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
	sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
	sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
This updates the DataSourceV2 API to use InternalRow instead of Row for the default case with no scan mix-ins.

Support for readers that produce Row is added through SupportsDeprecatedScanRow, which matches the previous API. Readers that used Row now implement this class and should be migrated to InternalRow.

Readers that previously implemented SupportsScanUnsafeRow have been migrated to use no SupportsScan mix-ins and produce InternalRow.

This uses existing tests.

Author: Ryan Blue <blue@apache.org>

Closes apache#21118 from rdblue/SPARK-23325-datasource-v2-internal-row.

Ref: LIHADOOP-48531

RB=1855575
A=
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
A follow up of apache#21118

Since we use `InternalRow` in the read API of data source v2, we should do the same thing for the write API.

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21948 from cloud-fan/row-write.

Ref: LIHADOOP-48531

RB=1855948
G=superfriends-reviewers
R=yezhou,latang,mshen,fli,zolin
A=
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
9 participants