Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23817][SQL] Create file source V2 framework and migrate ORC read path #23383

Closed
wants to merge 31 commits into from

Conversation

gengliangwang
Copy link
Member

@gengliangwang gengliangwang commented Dec 26, 2018

What changes were proposed in this pull request?

Create a framework for file source V2 based on data source V2 API.
As a good example for demonstrating the framework, this PR also migrate ORC source. This is because ORC file source supports both row scan and columnar scan, and the implementation is simpler comparing with Parquet.

Note: Currently only read path of V2 API is done, this framework and migration are only for the read path.
Supports the following scan:

  • Scan ColumnarBatch
  • Scan UnsafeRow
  • Push down filters
  • Push down required columns

Not supported( due to the limitation of data source V2 API):

  • Stats metrics
  • Catalog table
  • Writes

How was this patch tested?

Unit test

@gengliangwang
Copy link
Member Author

There was a PR for this: #20933. This one is to migrate ORC with latest data source V2 API.

@SparkQA
Copy link

SparkQA commented Dec 26, 2018

Test build #100446 has finished for PR 23383 at commit ed1a7fe.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class FallBackFileDataSourceToV1(sparkSession: SparkSession) extends Rule[LogicalPlan]
  • case class FilePartition(index: Int, files: Seq[PartitionedFile])
  • class EmptyPartitionReader[T] extends PartitionReader[T]
  • trait FileDataSourceV2 extends TableProvider with DataSourceRegister
  • class FilePartitionReader[T](
  • abstract class FilePartitionReaderFactory extends PartitionReaderFactory
  • abstract class FileScan(sparkSession: SparkSession,
  • abstract class FileScanBuilder(
  • abstract class FileTable(options: DataSourceOptions, userSpecifiedSchema: Option[StructType])
  • class PartitionRecordReader[T](
  • class PartitionRecordDReaderWithProject[X, T](
  • class OrcDataSourceV2 extends FileDataSourceV2
  • case class OrcPartitionReaderFactory(
  • case class OrcScan(
  • case class OrcScanBuilder(
  • case class OrcTable(options: DataSourceOptions, userSpecifiedSchema: Option[StructType])

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Dec 26, 2018

Test build #100454 has finished for PR 23383 at commit ed1a7fe.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class FallBackFileDataSourceToV1(sparkSession: SparkSession) extends Rule[LogicalPlan]
  • case class FilePartition(index: Int, files: Seq[PartitionedFile])
  • class EmptyPartitionReader[T] extends PartitionReader[T]
  • trait FileDataSourceV2 extends TableProvider with DataSourceRegister
  • class FilePartitionReader[T](
  • abstract class FilePartitionReaderFactory extends PartitionReaderFactory
  • abstract class FileScan(sparkSession: SparkSession,
  • abstract class FileScanBuilder(
  • abstract class FileTable(options: DataSourceOptions, userSpecifiedSchema: Option[StructType])
  • class PartitionRecordReader[T](
  • class PartitionRecordDReaderWithProject[X, T](
  • class OrcDataSourceV2 extends FileDataSourceV2
  • case class OrcPartitionReaderFactory(
  • case class OrcScan(
  • case class OrcScanBuilder(
  • case class OrcTable(options: DataSourceOptions, userSpecifiedSchema: Option[StructType])

@SparkQA
Copy link

SparkQA commented Jan 2, 2019

Test build #100639 has finished for PR 23383 at commit 01c7b07.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class FileTable(
  • case class OrcTable(

@SparkQA
Copy link

SparkQA commented Jan 3, 2019

Test build #100686 has finished for PR 23383 at commit f925e8f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

retest this please.

@gengliangwang gengliangwang changed the title [WIP][SPARK-23817][SQL] Migrate ORC file format read path to data source V2 [SPARK-23817][SQL] Migrate ORC file format read path to data source V2 Jan 3, 2019
@gengliangwang
Copy link
Member Author

pending on #23387 to be merged.

@SparkQA
Copy link

SparkQA commented Jan 3, 2019

Test build #100693 has finished for PR 23383 at commit f925e8f.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

retest this please.

@SparkQA
Copy link

SparkQA commented Jan 3, 2019

Test build #100697 has finished for PR 23383 at commit e1242a4.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class FallBackFileDataSourceToV1(sparkSession: SparkSession) extends Rule[LogicalPlan]
  • case class FilePartition(index: Int, files: Seq[PartitionedFile])
  • class EmptyPartitionReader[T] extends PartitionReader[T]
  • trait FileDataSourceV2 extends TableProvider with DataSourceRegister
  • class FilePartitionReader[T](
  • abstract class FilePartitionReaderFactory extends PartitionReaderFactory
  • abstract class FileScan(
  • abstract class FileScanBuilder(fileIndex: PartitioningAwareFileIndex, schema: StructType)
  • abstract class FileTable(
  • class PartitionRecordReader[T](
  • class PartitionRecordDReaderWithProject[X, T](
  • class OrcDataSourceV2 extends FileDataSourceV2
  • case class OrcPartitionReaderFactory(
  • case class OrcScan(
  • case class OrcScanBuilder(
  • case class OrcTable(

@SparkQA
Copy link

SparkQA commented Jan 3, 2019

Test build #100701 has finished for PR 23383 at commit e1242a4.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class FallBackFileDataSourceToV1(sparkSession: SparkSession) extends Rule[LogicalPlan]
  • case class FilePartition(index: Int, files: Seq[PartitionedFile])
  • class EmptyPartitionReader[T] extends PartitionReader[T]
  • trait FileDataSourceV2 extends TableProvider with DataSourceRegister
  • class FilePartitionReader[T](
  • abstract class FilePartitionReaderFactory extends PartitionReaderFactory
  • abstract class FileScan(
  • abstract class FileScanBuilder(fileIndex: PartitioningAwareFileIndex, schema: StructType)
  • abstract class FileTable(
  • class PartitionRecordReader[T](
  • class PartitionRecordDReaderWithProject[X, T](
  • class OrcDataSourceV2 extends FileDataSourceV2
  • case class OrcPartitionReaderFactory(
  • case class OrcScan(
  • case class OrcScanBuilder(
  • case class OrcTable(

@gengliangwang
Copy link
Member Author

retest this please.

@SparkQA
Copy link

SparkQA commented Jan 4, 2019

Test build #100718 has finished for PR 23383 at commit e1242a4.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class FallBackFileDataSourceToV1(sparkSession: SparkSession) extends Rule[LogicalPlan]
  • case class FilePartition(index: Int, files: Seq[PartitionedFile])
  • class EmptyPartitionReader[T] extends PartitionReader[T]
  • trait FileDataSourceV2 extends TableProvider with DataSourceRegister
  • class FilePartitionReader[T](
  • abstract class FilePartitionReaderFactory extends PartitionReaderFactory
  • abstract class FileScan(
  • abstract class FileScanBuilder(fileIndex: PartitioningAwareFileIndex, schema: StructType)
  • abstract class FileTable(
  • class PartitionRecordReader[T](
  • class PartitionRecordDReaderWithProject[X, T](
  • class OrcDataSourceV2 extends FileDataSourceV2
  • case class OrcPartitionReaderFactory(
  • case class OrcScan(
  • case class OrcScanBuilder(
  • case class OrcTable(

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jan 16, 2019

Test build #101325 has finished for PR 23383 at commit 30e0481.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 17, 2019

Test build #101347 has finished for PR 23383 at commit 6e87532.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class DummyReadOnlyFileTable extends Table with SupportsBatchRead

@gengliangwang
Copy link
Member Author

retest this please.

@SparkQA
Copy link

SparkQA commented Jan 17, 2019

Test build #101359 has finished for PR 23383 at commit 6e87532.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class DummyReadOnlyFileTable extends Table with SupportsBatchRead

val hadoopConf =
sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().asScala.toMap)
val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(filePaths, hadoopConf,
checkEmptyGlobPath = true, checkFilesExist = options.checkFilesExist())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should revisit it later. It doesn't make sense to have different behaviors of file listing between read and write.

I'm fine with it as a workaround for now. In the followup we can remove it, fix tests and accepts the behavior changes for ds v2.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can always set checkFilesExist as false here.

@cloud-fan
Copy link
Contributor

LGTM, merging to master!

@cloud-fan cloud-fan closed this in c0632ce Jan 17, 2019
@gengliangwang
Copy link
Member Author

@cloud-fan @dongjoon-hyun @gatorsmile Thanks for the review. I will come up with the file write path migration very soon.

jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…ion columns

## What changes were proposed in this pull request?

Currently OrcColumnarBatchReader returns all the partition column values in the batch read.
In data source V2, we can improve it by returning the required partition column values only.

This PR is part of apache#23383 . As cloud-fan suggested, create a new PR to make review easier.

Also, this PR doesn't improve `OrcFileFormat`, since in the method `buildReaderWithPartitionValues`, the `requiredSchema` filter out all the partition columns, so we can't know which partition column is required.

## How was this patch tested?

Unit test

Closes apache#23387 from gengliangwang/refactorOrcColumnarBatch.

Lead-authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Co-authored-by: Gengliang Wang <ltnwgl@gmail.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…ad path

## What changes were proposed in this pull request?
Create a framework for file source V2 based on data source V2 API.
As a good example for demonstrating the framework, this PR also migrate ORC source. This is because ORC file source supports both row scan and columnar scan, and the implementation is simpler comparing with Parquet.

Note: Currently only read path of V2 API is done, this framework and migration are only for the read path.
Supports the following scan:
- Scan ColumnarBatch
- Scan UnsafeRow
- Push down filters
- Push down required columns

Not supported( due to the limitation of data source V2 API):
- Stats metrics
- Catalog table
- Writes

## How was this patch tested?

Unit test

Closes apache#23383 from gengliangwang/latest_orcV2.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…ex in the write path

## What changes were proposed in this pull request?

In apache#23383, the file source V2 framework is implemented. In the PR, `FileIndex` is created as a member of `FileTable`, so that we can implement partition pruning like apache@0f9fcab in the future(As data source V2 catalog is under development, partition pruning is removed from the PR)

However, after write path of file source V2 is implemented, I find that a simple write will create an unnecessary `FileIndex`, which is required by `FileTable`. This is a sort of regression. And we can see there is a warning message when writing to ORC files
```
WARN InMemoryFileIndex: The directory file:/tmp/foo was not found. Was it deleted very recently?
```
This PR is to make `FileIndex` as a lazy value in `FileTable`, so that we can avoid creating unnecessary `FileIndex` in the write path.

## How was this patch tested?

Existing unit test

Closes apache#23774 from gengliangwang/moveFileIndexInV2.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
mccheah pushed a commit to palantir/spark that referenced this pull request May 15, 2019
…ex in the write path

## What changes were proposed in this pull request?

In apache#23383, the file source V2 framework is implemented. In the PR, `FileIndex` is created as a member of `FileTable`, so that we can implement partition pruning like apache@0f9fcab in the future(As data source V2 catalog is under development, partition pruning is removed from the PR)

However, after write path of file source V2 is implemented, I find that a simple write will create an unnecessary `FileIndex`, which is required by `FileTable`. This is a sort of regression. And we can see there is a warning message when writing to ORC files
```
WARN InMemoryFileIndex: The directory file:/tmp/foo was not found. Was it deleted very recently?
```
This PR is to make `FileIndex` as a lazy value in `FileTable`, so that we can avoid creating unnecessary `FileIndex` in the write path.

## How was this patch tested?

Existing unit test

Closes apache#23774 from gengliangwang/moveFileIndexInV2.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants