Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Unified design for data lake read support in Gluten + Velox #3378

Open
yma11 opened this issue Oct 11, 2023 · 32 comments
Open

[VL] Unified design for data lake read support in Gluten + Velox #3378

yma11 opened this issue Oct 11, 2023 · 32 comments
Labels
enhancement New feature or request on track On track by Velox Gluten team

Comments

@yma11
Copy link
Contributor

yma11 commented Oct 11, 2023

Description

Currently there are 2 PRs opened in Gluten to support Iceberg COW table read and Delta Lake read. There is also one hot discussion in Velox about Iceberg read support. By consolidating the ideas and based on Gluten's position, we would like to share a draft unified design for data lake read support in Gluten.

As addressed in this project's home page, one of Gluten key function is to transform Spark’s whole stage physical plan to Substrait plan and send to native. It applies to data lake read support, thus:

  1. We'd better avoid hacking of original Spark physical plan node. Gluten core has plan transformer to generate correct plan info into substrait format and then pass it to Velox for read and computation. So no matter what kind of the hack is, it should can be done in the transformer layer, such as column mapping. IMO, we should try best pass original info in spark plan to Velox as a bridge and do correct consumption at Velox side, unless it's not doable or velox can't support. By the way, one issue for feature like column mapping is, it's a common feature for kinds of file format reading, velox can handle this at its datasource level and the community has plan to do so.

  2. Clear transformer hierarchy is need for different data lake backends. In the Iceberg COW table read PR, a new branch is added to do specific process for Iceberg and leverage an utility class put in a dedicated folder, and in future, I believe more branches will be needed to support other cases, like MOR. So introducing a new transformer inherited from BatchScanExecTransformer would be a better way. The possible hierarchy should be like following:

                                IcebergDataSoure?                                     DeltaLakeDatasoure?                        ?
                                                    \                                     |                                 / 
                                                            \                             |                           /
                                                                   \                      |                      /
                                                                                       velox
                                                                                          |
                                                                                     substrait
                                                               /                          |                    \
                                                  /                                       |                             \
        SparkBatchQueryScanExecTransformer                           DeltaLakeScanExecTransformer                                 \
                                  |                                                       |
                  BatchScanExecTransformer                              FileSourceScanExecTransformer         HiveTableScanExecTransformer
                                    \                                                     |                                    /
                                                                         BasicScanExecTransformer

@YannByron @felipepessoto @liujiayi771 @ulysses-you, please give comments on above suggestions. Thanks.

@yma11 yma11 added the enhancement New feature or request label Oct 11, 2023
@liujiayi771
Copy link
Contributor

liujiayi771 commented Oct 11, 2023

@yma11 In fact, these two PRs were all proposed by our team. @YannByron and I have communicated offline to a better architecture design for these two PRs. We agree with what you said, and we can be responsible for optimizing this area and provide a better design structure as soon as possible.

@liujiayi771
Copy link
Contributor

We also encountered some problems with BatchScanExecTransformer, and the current implementation may also need optimization.

@YannByron
Copy link
Contributor

Hi @yma11, thank you for the invitation.

Here, I will briefly explain that our following design will mainly cover the three parts in the integration of Lake format and Gluten/Velox. IMO all these problems are related to the Lake format, but they can be solved independently:

1 Schema Related:

such as column mapping. I think this part can be solved at the gluten-core layer, such as #3376. But perhaps a better solution is to rewrite Transformer, not SparkPlan (the pr's implementation), considering that Gluten should return the original SparkPlan when fallback occurs.

As @yma11 mentioned, velex plans to implement column mapping in the native layer (I am very worried that will make these native readers/datasources more complicated). Even so, I think we can still accept the current solution (rewriting SparkPlan or Transformer). Some reasons for this:

  1. This solution is brief enough and easy to switch this to that solution (that velox can deal with column mapping) if it has to.
  2. Let users use it first in the scenario they need. After all, there is no exact schedule to support this feature.
  3. Moreover, this solution currently provides a small framework, that is, to rewrite plan, which can be used when needed.

2 Spark Datasource V1 and V2:

Currently gluten/velox already supports FileSourceScanExecTransformer (corresponding to DSV1, that's DeltaLake used) and BatchScanExecTransformer (corresponding to DSV2, that's Iceberg used). For DSV2, different datasources have different implementations of the org.apache.spark.sql.connector.read.Scan. We have to build GlutenPartition which is used in native reader by parsing different implementations of Scan, that's #3043 @liujiayi771 (In this pr, also allow Iceberg cow table to use the underlying storage file format of native reader, like orc/parquet. This will be discussed below).
IMO, the key of this part is how to make the framework design more reasonable, clearer, and easier to expand (@ulysses-you
also said in #3043 (comment)).

3 COW and MOR tables:

To support cow table, one of the native implementation of cow tables (Deltalake non-DV table, Hudi cow table, Iceberg V1 table, Paimon append-only table) in different lake formats is that using the underlying file format readers(parquet/orc), which is the way that is adopted by #3043 and #3376. I personally think that this method is acceptable under the current circumstances.
To support mor table, it must be solved by implementing the separate datasource (e.g. IcebergDataSource ) in velox layer (obviously, the velox community needs to follow up here, like Iceberg facebookincubator/velox#5977).
I hope that the separate datasource implementations need to consider supporting the cow table at the same time. (when this is done, it's easy to adjust the native implementation of cow table).

In addition, on the code framework (or the transformer hierarchy), our thoughts is consistent with @yma11 mentioned above.

Next, we will propose the design (maybe it's a draft) soon cc @liujiayi771 , so that we can discuss deeply on it.

@yma11
Copy link
Contributor Author

yma11 commented Oct 12, 2023

  1. This solution is brief enough and easy to switch this to that solution (that velox can deal with column mapping) if it has to.
  2. Let users use it first in the scenario they need. After all, there is no exact schedule to support this feature.

It's acceptable to continue current solution, like column mapping, and then switch to velox when it's supported for Gluten quick adoption. But even this, a clear detailed design need to be finalized first so that community can work together based on it. Let's discuss more when the draft is ready.

@FelixYBW
Copy link
Contributor

FelixYBW commented Oct 18, 2023

these two PRs were all proposed by our team.

Are the two PRs submitted?

@FelixYBW
Copy link
Contributor

these two PRs were all proposed by our team.

Are the two PRs submitted?

@YannByron
Copy link
Contributor

YannByron commented Nov 3, 2023

Goals

Makes gluten/velox support for lake format(DeltaLake, Iceberg) query.

  • support both Copy-On-Write table and Merge-On-Read table;

    • DeltaLake with/without Deletion Vector

    • Iceberg V1/V2 Table

    • Hudi COW/MOR Table

  • support column-mapping mode

Some design thoughts:

  • In gluten project, gluten-core and the parts related to lake format are divided into modules, and the particular logic of Lake format is analyzed and processed separately;

  • enable gluten supports lake format in a plug-in manner, gluten-core should not depend on any lake format modules.

Design Sketch

Project Framework

image

gluten-core needs to provide some interfaces which is used to extend lake format-specified logic.  some of interfaces are shown as follows:

    gluten-core
    	io/glutenproject/
    		extension/
      		trait RewritePlanRules
      	execution/
        	trait BaseDatasource
                trait BaseScanTransformer

RewritePlanRules and Column Mapping

It allows table columns and the underlying file(Parquet/ORC) columns to use different names. This enables schema evolution operations such as RENAME COLUMN and DROP COLUMNS without the need to rewrite the underlying files.

Once we scan a table that enable column mapping (for example: for Delta Table), we should to deal with the relationship between table columns and file columns. And this can be achieved by rewriting plan.

image

To solve this case, RewritePlanRules is abstracted out, which provide an ability to rewrite/transform SparkPlan before transform SparkPlan to GlutenPlan.

This inferface is like this:

    trait RewritePlanRules {
    	def rules: Seq[Rule[SparkPlan]]
    }

And then, A RewritePlanrule will load all the implementations of RewritePlanRules first, and inject this to ColumnarOverrides.

    case class RewritePlan(session: SparkSession) extends Rule[SparkPlan] {
    
      // 1. load all the implementations of [[ RewritePlanRules ]]
      val rules: Seq[Rule[SparkPlan]] = ...
    
      // 2. apply these rules one by one
      def apply(plan: SparkPlan): SparkPlan = {
        rules.foldLeft(plan) {
          case (plan, rule) =>
            rule(plan)
        }
      }
    }

And we can define DeltaRewritePlanRulesin gluten-delta and this has a rule to transform each of DeltaScan node to Project + FileScan.

Datasource Scan

Currently, Spark supports four Scan types:

  • HiveTableScanExec: used in hive tables.

  • BatchScanExec:  used in v2 datasource which is supported since Spark 3.0, e.g. iceberg.

  • FileSourceScanExec: used in HadoopFSRelation, e.g. parquet, delta, hudi (cow without schema on read).

  • RowDatasourceScanExec: used in datasource that return data directly, not files, e.g. hudi.

And gluten supports the first three types. 

For RowDatasourceScanExec, I think tables which use RowDatasourceScanExec need to provide the native version, integrate it into Velox,  and then gluten can work on these. The following design is applicable to RowDatasourceScanExec, but we force on BatchScanExec and FileSourceScanExec at the implementation level.

For BatchScanExec, the core abstraction in Spark is Batch, which define the two key things:

  1. planInputPartitons: these data spliits that can be processed by spark tasks.

  2. createReaderFactory: for each of data split, its function defines that how to scan tuples from this data split.

The two functions are functionally aligned FileIndex and FileFormat which used in datasource v1.

Scan Transformer Structure

Here this desgin refines the Scan Transformer structure.

image

BaseDataSource

Encapsulate the necessary datasource information.

    trait BaseDataSource {
    
      def dataSchema: StructType
      def partitionSchema: StructType
      
      def partitions: Array[InputPartition]
      def fileFormat: String
    
      def getInputFilePaths: Seq[String]
    }

BaseScanTransformer

the core abstraction, the key abilities:

  1. inherit BaseDataSource;

  2. For velox backend, provide a list of LocalFilesNode;

  3. For clickhouse backed, provide doExecuteColumnar method;

    trait BaseScanTransformer extends TransformSupport with SupportFormat {
      
      def filterExprs(): Seq[Expression]
    
      def outputAttributes(): Seq[Attribute]
      
      override lazy val supportsColumnar: Boolean = {
        // .......
      }
    
      override def getBuildPlans: Seq[(SparkPlan, SparkPlan)] = {
        // .......
      }
    
      override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = Seq.empty
      
      override protected def doValidateInternal(): ValidationResult = {
        // ......
      }
    
      override def doTransform(context: SubstraitContext): TransformContext = {
        // ......
      }
    
      override def doExecuteColumnar(): RDD[ColumnarBatch]
    
      
      override def getLocalFilesNodes: Array[LocalFilesNode] = {
        partitions.map { p =>
          constructLocalFilesNode(p)
        }
      }
    
      def constructLocalFilesNode(input: InputPartition): LocalFilesNode
    }

Notice: 

For mor tables, LocalFilesNode doesn't keep the enough information. Thus, we allow to create its child class to expand, and the child class is required to override the toProtobuf method. In the following iceberg part, we will give an example.

Datasource v1 scan transformer

DatasourceScanTransformer

the base class for datasource v1.

    abstract class DatasourceScanTransformer extends BaseScanTransformer with BaseDataSource {
    
      val relation: BaseRelation
      
      val tableIdentifier: Option[TableIdentifier]
    }

FileSourceScanTransformer:  the scan implementation of Datasource v1 based on HadoopFSRelation, and extends DatasourceScanTransformer;

DeltaLakeScanTransformer:  the scan implementation of Delta table, located in gluten-delta, extends FileSourceScanTransformer;

Datasource v2 scan transformer

BatchScanTransformer

the base class for datasource v2.

    class BatchScanTransformer(
      scan: Scan
      // ....
      // ....
    ) extends BaseScanTransformer {
    
      override def constructLocalFilesNode(input: InputPartition): LocalFilesNode = {
        //...
      }
    }

IcebergScanTransformer

the scan implementation of Iceberg table, located in gluten-iceberg, extends BatchScanTransformer;

Here give a detail explanation for this.

    case class DeleteFile (
      path: String,
      fileContent: Int, //1=>posisition delete, 2=>equality delete
      fileFormat: ReadFileFormat,
      fileSize: Long,
      recordCount: Long,
      lowerBounds: Map<Int, String>,
      upperBounds: Map<Int, String>
    )
    
    class IcebergLocalFilesNode extends LocalFilesNode {
    
      private val deleteFiles: List[List[DeleteFile]]
      
      override def toProtobuf(): ReadRel.LocalFiles = {
        //....
      }
    }
    
    class IcebergScanTransformer(
      scan: SparkBatchQueryScan
      // ....
      // ....
    ) extends BatchScanTransformer {
    
      // convert InputPartition to IcebergLocalFilesNode
      override def constructLocalFilesNode(input: InputPartition): LocalFilesNode = {
        // ...
      }
    }

To support Iceberg V2 table(MOR), the data split needs to contain these deleteFiles. We also upgrade the FileOrFiles accordingly.

Scan Transformer Factory

There are many places to create the scan transformer object based on file format or scan. Also a factory class for these needs to be provided. Among them, the scan transformer of lake format is initialized  by reflection.

    object ScanTransformerFactory {
      def apply(fileSourceExec: FileSourceScanExec): FileSourceScanExecTransformer = {
        ...
      }
      
      def apply(batchScanExec: BatchScanExec): BatchScanTransformer = {
        ...
      }
    }

ReadFileFormat

Let explain the fileFormat method of BaseDataSource more.

To support the cow table of lake format, we allow to use the underlying file format until a native reader implemention is provided. So this method can return ParquetReadFormat or OrcReadFormat in the current solution.

To support the mor table, a native reader is necessary and it should support cow and mor table. With this, this mehtod should return the specific ReadFileFormat, e.g. DeltaReadFileFormat, IcebergReadFileFormat.

Native Reader

Protobuf

Corresponding to the LocalFilesNode expansion, modify the protobuf FileOrFiles message to extend support for lake formats.

Due to the different parameters required for different lake formats and table types, we need to abstract different ReadOptions for each lake format and pass specific information. For example, in the FileOrFiles message, add the IcebergReadOptions message to convey iceberg-specific information.

Due to the preference of the Velox community, the DeleteFile information is likely to be serialized as a string for transmission. The DeleteFile section in protobuf may be adjusted to a string.

    message IcebergReadOptions {
      enum FileContent {
        POSITION_DELETES = 1;
        EQUALITY_DELETES = 2;
      }
      message DeleteFile {
        FileContent fileContent = 1;
        string filePath = 2;
        oneof file_format {
          ParquetReadOptions parquet = 3;
          OrcReadOptions orc = 4;
        }
        uint64 fileSize = 5;
        uint64 recordCount = 6;
      }
    
      oneof file_format {
        ParquetReadOptions parquet = 1;
        OrcReadOptions orc = 2;
      }
      repeated DeleteFile delete_files = 3;
    }

Native SplitInfo

On the native, after receiving the protobuf message, we need to construct different SplitInfo based on the file_format information recorded in the protobuf. Similarly, in the native, different lake formats also need to construct xxSplitInfo that inherits from SplitInfo to pass additional information. These information will be used to construct the corresponding HiveConnectorSplit in WholeStageResultIteratorFirstStage.

Similar to handling in Java, we need to abstract the process of converting SplitInfo to HiveConnectorSplit in a method of SplitInfo itself. SplitInfo should be converted to HiveConnectorSplit, while IcebergSplitInfo should be converted to HiveIcebergSplit ,etc.

    struct SplitInfo {
      /// Whether the split comes from arrow array stream node.
      bool isStream = false;
    
      /// The Partition index.
      u_int32_t partitionIndex;
    
      /// The partition columns associated with partitioned table.
      std::vector<std::unordered_map<std::string, std::string>> partitionColumns;
    
      /// The file paths to be scanned.
      std::vector<std::string> paths;
    
      /// The file starts in the scan.
      std::vector<u_int64_t> starts;
    
      /// The lengths to be scanned.
      std::vector<u_int64_t> lengths;
    
      /// The file format of the files to be scanned.
      dwio::common::FileFormat format;
    
      velox::connector::ConnectorSplit getConnectorSplit();
    };

    struct IcebergSplitInfo : public SplitInfo {
      std::unordered_map<std::string, std::vector<DeleteFile>> deleteFiles;
      velox::connector::ConnectorSplit getConnectorSplit() override;
    }

@YannByron
Copy link
Contributor

@yma11 @weiting-chen please help to review this and look forward your feedbacks.

@zhztheplayer
Copy link
Member

zhztheplayer commented Nov 3, 2023

Thanks a lot @YannByron for the detailed design!

And then, A RewritePlanrule will load all the implementations of RewritePlanRules first, and inject this to ColumnarOverrides.

This looks like a very common feature. Did you already have some thoughts on the way to implement this? By a service loader or some kind of reflection tools? Also, should we care about the applying order for the plugged rules?

BatchScanTransformer: the base class for datasource v2.

The hierarchy of datasource transformers in Gluten has an issue that they extend from Vanilla Spark's case classes. For example, BatchScanExecTransformer inherits from case class BatchScanExec and FileSourceScanExecTransformer inherits from case class FileSourceScanExec. By the new BaseScanTransformer -> DatasourceScanTransformer -> BatchScanTransformer design would you mean to completely remove the problematical case-class-extending practices from code base?

Corresponding to the LocalFilesNode expansion, modify the protobuf FileOrFiles message to extend support for lake formats.

Two questions here:

  1. Was all the needed Velox code already contributed to upstream Velox?
  2. Do you already have some thoughts to organize new lake-specific cpp code? Or we just put all the needed code to cpp/core ? (I'll be OK to both since our cpp code is not that well-organized yet. For example the remote shuffle code is still in cpp/core)

And some other questions for the whole design:

  1. To what extent should we modify backends' own code? For example, do we have to apply changes to backends-velox, backends-clickhouse, cpp/velox, etc? (except for test code if any)
  2. IIUC each lake format will have its own Maven module (correct me if I was wrong). So are we going to compile the modules into independent jars to let users drop in, or just to include the compiled code into the Gluten fat jar?
  3. Are we going to have individual Spark config options for each lake format? Do we have to put them into GlutenConfig.scala or somewhere in lake format's own Maven module?

Again, thank you everyone for bringing this up. Can't wait to see it landing.

@liujiayi771
Copy link
Contributor

liujiayi771 commented Nov 3, 2023

@zhztheplayer

would you mean to completely remove the problematical case-class-extending practices from code base?

I have encountered this issue before, and I also suggest that the community remove this inheritance relationship. We can try to do it.

Was all the needed Velox code already contributed to upstream Velox?

Not yet. But they have already merged many prerequisite PRs, and the PR supporting iceberg has also been proposed. It should be relatively quick to merge.

Do you already have some thoughts to organize new lake-specific cpp code?

I have successfully used the relevant PR from the Velox community to establish the process of reading iceberg mor table. In gluten/cpp, there should only be code related to converting from substrait to Velox icebergSplit. The specific mor process is implemented in Velox. As for other lake formats, there is currently no prerequisite work for mor table in native, but for this framework, we can start by supporting cow table first.

@YannByron
Copy link
Contributor

@zhztheplayer

This looks like a very common feature. Did you already have some thoughts on the way to implement this? By a service loader or some kind of reflection tools? Also, should we care about the applying order for the plugged rules?

yep, maybe use a service loader like org.apache.spark.sql.execution.datasourcesDataSource.lookupDataSource. I think we don't need to care the order between different lake format modules. But each of Rule should guarantee that only plans that belong to this table can be rewrote in its own module..

would you mean to completely remove the problematical case-class-extending practices from code base?

In fact, I'm aware of the problem. But this does not affect the design, so it can be discussed separately. cc @liujiayi771

To what extent should we modify backends' own code? For example, do we have to apply changes to backends-velox, backends-clickhouse, cpp/velox, etc? (except for test code if any)

Based on my design, I plan to move the logic of IteratorApi.genFilePartition to the scan transformers. Nothing else needs to modify..

IIUC each lake format will have its own Maven module (correct me if I was wrong). So are we going to compile the modules into independent jars to let users drop in, or just to include the compiled code into the Gluten fat jar?

Yep. the former way. If users want to use lake format, they need to put the separate jar to the classpath, not a fat jar.

Are we going to have individual Spark config options for each lake format? Do we have to put them into GlutenConfig.scala or somewhere in lake format's own Maven module?

No any configs in this design. I hope users enable native query again lake format only by putting the gluten-lakeformat jar to the class path.

@yma11
Copy link
Contributor Author

yma11 commented Nov 3, 2023

Hi @YannByron and @liujiayi771, thank you guys for such a detailed design!

RewritePlanRules and Column Mapping: are there any other scenarios that will need such RewritePlanRules? If the original plan changes, what will happen if fallback happens on the changed node? What about if this scan has filter pushdown? I think it's not necessary to do this rewrite on plan if the Column Mapping can be done in transformer layer.

To support the mor table, a native reader is necessary and it should support cow and mor table. With this, this mehtod should return the specific ReadFileFormat, e.g. DeltaReadFileFormat, IcebergReadFileFormat.

Does it mean that in MOR table case, we don't need to care it's a parquet or orc file?

IIUC each lake format will have its own Maven module (correct me if I was wrong). So are we going to compile the modules into independent jars to let users drop in, or just to include the compiled code into the Gluten fat jar?

Yep. the former way. If users want to use lake format, they need to put the separate jar to the classpath, not a fat jar.

My understanding is that we don't have direct dependency on these lake related jars but do our job based on Spark interfaces. The data lake read/write offloading to Gluten/velox is transparent for users who have Spark+data lake properly worked. right?

@YannByron
Copy link
Contributor

@yma11

RewritePlanRules and Column Mapping: are there any other scenarios that will need such RewritePlanRules? If the original plan changes, what will happen if fallback happens on the changed node? What about if this scan has filter pushdown? I think it's not necessary to do this rewrite on plan if the Column Mapping can be done in transformer layer.

I don't know whether there are other cases that need RewritePlan or RewriteTransformer. At least, delta column mapping dose, and there is a way to use if needed. Even a plan is rewrote, it can return the original, right plan when fallback happens because the origin plan is persisted before any rule is applied in gluten. I think either RewritePlan or RewriteTransformer works. I choose the first one, cause it may be easier to understand.

Does it mean that in MOR table case, we don't need to care it's a parquet or orc file?

In the java layer, yes. But the enough information (base file's file format, delete file's file format) needs to be passed as options to native layer.

My understanding is that we don't have direct dependency on these lake related jars but do our job based on Spark interfaces. The data lake read/write offloading to Gluten/velox is transparent for users who have Spark+data lake properly worked. right?

These modules (like gluten-delta, gluten-iceberg) depends on lake related jars, but gluten-core doesn't. Users just need to put the gluten-datalake jar to the class path, then it's transparent for them.

@liujiayi771
Copy link
Contributor

@zhztheplayer Update the answer to this question.

would you mean to completely remove the problematical case-class-extending practices from code base?

I tried to remove this case-class-extending today. I found that this problematical case-class-extending is used to retrieve the filteredPartitions of BatchScanExec. However, the method is private and cannot be directly invoked, which is why it is accessed through inheritance to obtain the required InputPartition. It seems that apart from the current implementation, reflection seems to be the only method available. Our refactoring does not solve the issue of obtaining the InputPartition.

@YannByron
Copy link
Contributor

YannByron commented Nov 6, 2023

Update: As discussing with @yma11 offline, we choose the solution that rewrite Transformer, not SparkPlan in schema-related scenarios.

@yma11
Copy link
Contributor Author

yma11 commented Nov 7, 2023

@YannByron @liujiayi771 Thanks for your guys' active involvement. Look forward to your PRs!

@rz-vastdata
Copy link
Contributor

Greetings,

We would like to join the discussion and ask for your guidance - we are developing a new Velox connector for integrating our storage engine with the Gluten project. IIUC, the current Gluten design allows pushing down file-based scans of supported formats (e.g. Parquet, ORC, ...) into Velox. However, our storage engine uses RPC-based communication for metadata and data retrieval, similar to the Arrow Flight protocol. Currently we have a Spark Datasource V2 connector and we would like to integrate it with the Gluten project.

We have a few thoughts and would be happy to hear your opinions:

  • we can create a new file format so the existing Gluten planner logic will continue to work (similar to how it was done to support Delta Lake format), however it would require us to define a new Velox and Substrait file format - please see here for a proof-of-concept implementation (based over Gluten 1.0 release).
  • we can join and participate the unified design discussion here so that our current connector could use and benefit from the suggested framework above.

Please let us know what do you think should be the best way forward.

@zhouyuan zhouyuan changed the title Unified design for data lake read support in Gluten + Velox [VL] Unified design for data lake read support in Gluten + Velox Nov 16, 2023
@zhouyuan zhouyuan pinned this issue Nov 16, 2023
@liujiayi771
Copy link
Contributor

Hi @rz-vastdata. We are currently working on implementing the design mentioned above. Iceberg may need a new file format as well. You can participate in the review of our PRs and provide some suggestions. @yma11 We need to expedite the progress of PR review as there are many additional tasks to do.

@rz-vastdata
Copy link
Contributor

We are currently working on implementing the design mentioned above. Iceberg may need a new file format as well.

Sounds great, thanks for the update!

You can participate in the review of our PRs and provide some suggestions.

We'd be happy to participate in the review.
Could you please let us know which issues/PRs are related to the above design?
Is there a label (https://github.com/oap-project/gluten/labels) or an umbrella issue we can follow?

@liujiayi771
Copy link
Contributor

@rz-vastdata I have modified the title of the PR, and it will reference this issue.

@rz-vastdata
Copy link
Contributor

rz-vastdata commented Nov 21, 2023

Following from #3650, IIUC the current approach is to accelerate file-based data sources in Gluten - right?

Asking since in our case, the Vast data source is based on a generic Scan (and not a more specific FileScan), which currently causes it to fail conversion to Gluten plan - e.g.:
https://github.com/oap-project/gluten/blob/30865e5874e33c5996194352f43c911960673cd2/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala#L60-L65
https://github.com/oap-project/gluten/blob/30865e5874e33c5996194352f43c911960673cd2/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala#L483-L508
https://github.com/oap-project/gluten/blob/b40a5f094d911cccaa8eb90a11fe44f78038ea94/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala#L65-L81

Would it be possible to support converting custom Scan-based data sources into Gluten-supported inputs?

I understand that it also requires adding support to the Substrait protobuf definition and the Velox source code, but it seems to match better the use-case where the connector is not using files to store the data. For example, in our case, there is a custom protocol to retrieve the data from an external RPC server, so there is not need to use file-based I/O and abstractions.

BTW, how is this handled when Gluten is used to interface with external hardware devices (e.g. FPGA/GPU/ASIC accelerators, as shown in
https://user-images.githubusercontent.com/47296334/199617207-1140698a-4d53-462d-9bc7-303d14be060b.png)?

@Yohahaha
Copy link
Contributor

Since #3650 merged, I suggest define SplitInfo proto in gluten side which mean do not use substrait LocalFile, then we can decouple SplitInfo from substrait plan, cause plan is immutable in specific stage but SplitInfo is mutable, it bring more clear design and interface, and help to decrease serialization consume.

@yma11
Copy link
Contributor Author

yma11 commented Nov 22, 2023

@Yohahaha What's the purpose to make SplitInfo mutable? I think any modification on SplitInfo should be done before it's created. It's generated based on Spark plan which means originally it's part of plan info. Currently all plan related info is passed to Velox side through substrait and it's better not to scatter such info in Gluten.

@Yohahaha
Copy link
Contributor

@yma11

plan is immutable in specific stage but SplitInfo is mutable

plan or plan fragment is immutable means not contain task info, SplitInfo is mutable means all task of stage has its own SplitInfo to process.

Combine these bring much complexity, we need pass each task's SplitInfo into plan and then serialize whole plan. We encounter memory issue before, and serialization cost issue now.

it's better not to scatter such info in Gluten.

so you'd like use LocalFileNode for all format? does it satisfy all case?

@yma11
Copy link
Contributor Author

yma11 commented Nov 22, 2023

Combine these bring much complexity, we need pass each task's SplitInfo into plan and then serialize whole plan. We encounter memory issue before, and serialization cost issue now.

The serialized substrait plan has a complete definition for corresponding velox task so it needs contain SplitInfo. The serialization happens for each task even we don't pass SplitInfo as it may still has other task specific infos. Even you do the serialization only once at driver side, you need to combine this shared plan part along with task specific infos and that will not be straight and clean. You may only get some memory saving but execution time should not be benefited.

it's better not to scatter such info in Gluten.

so you'd like use LocalFileNode for all format? does it satisfy all case?

I am not sure. But it's quite open to add more necessary fields in this proto if we need.

@liujiayi771
Copy link
Contributor

liujiayi771 commented Nov 22, 2023

@rz-vastdata #3650 is just a preparatory work. The next PR will be submitted next week and will include IcebergScanTransformer, which contains the specific way of generating SplitInfo for Iceberg. Iceberg's Scan also inherits from Spark Scan, which I understand is consistent with your scenario.

@liujiayi771
Copy link
Contributor

@yma11 It seems that some of the information required for the Iceberg format can be added into LocalFileNode , but it may appear somewhat messy. We can take a look at it together later. The next PR will not include this part yet, as it will only support the COW table. We will need to wait for the Iceberg-related PRs in the Velox community to be merged before we submit the modifications. However, we can put forward an initial Draft PR before that.

@rz-vastdata
Copy link
Contributor

Iceberg's Scan also inherits from Spark Scan, which I understand is consistent with your scenario.

Sounds good, many thanks @liujiayi771!

@rz-vastdata
Copy link
Contributor

rz-vastdata commented Nov 26, 2023

By the way, is there an open-source example for Gluten being used to interface with external devices (e.g. FPGA/GPU/ASIC accelerators, as shown at https://github.com/oap-project/gluten#2-architecture), e.g. when the data sources are not file-based?

img

@AlexanderChiuluvB
Copy link

By the way, is there an open-source example for Gluten being used to interface with external devices (e.g. FPGA/GPU/ASIC accelerators, as shown at https://github.com/oap-project/gluten#2-architecture), e.g. when the data sources are not file-based?

img

I'm working on the FPGA version

@EpsilonPrime
Copy link

Why is there a DeleteFile option to the Iceberg ReadOptions? Presumably you aren't actually deleting files as part of a read operation. Are you ignoring the files instead? For that matter, why is the option that says how to delete named FileContent?

@liujiayi771
Copy link
Contributor

Why is there a DeleteFile option to the Iceberg ReadOptions?

Iceberg uses DeleteFile to exclude some records in data file.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request on track On track by Velox Gluten team
Projects
None yet
Development

No branches or pull requests

9 participants