Skip to content

Commit

Permalink
[Delta] fix UT
Browse files Browse the repository at this point in the history
  • Loading branch information
寻径 committed Dec 14, 2023
1 parent e4015ab commit 77c25f3
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ object ScanTransformerFactory {
} else {
ExpressionConverter.transformDynamicPruningExpr(scanExec.partitionFilters, reuseSubquery)
}
val fileIndex = scanExec.relation.location
lookupDataSourceScanTransformer(fileIndex.getClass.getName) match {
val fileFormat = scanExec.relation.fileFormat
lookupDataSourceScanTransformer(fileFormat.getClass.getName) match {
case Some(clz) =>
clz
.getDeclaredConstructor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.extension
package io.glutenproject.execution

import io.glutenproject.execution.FileSourceScanExecTransformer
import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat

import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.extension

import io.glutenproject.execution.{DataSourceScanTransformerRegister, FileSourceScanExecTransformer}
package io.glutenproject.execution

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.FileSourceScanExec

class DeltaScanTransformerProvider extends DataSourceScanTransformerRegister {

override val scanClassName: String = "org.apache.spark.sql.delta.files.TahoeLogFileIndex"
override val scanClassName: String = "org.apache.spark.sql.delta.DeltaParquetFileFormat"

override def createDataSourceTransformer(
batchScan: FileSourceScanExec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package io.glutenproject.extension

import io.glutenproject.execution.{FileSourceScanExecTransformer, ProjectExecTransformer}
import io.glutenproject.execution.{DeltaScanTransformer, ProjectExecTransformer}
import io.glutenproject.extension.DeltaRewriteTransformerRules.columnMappingRule
import io.glutenproject.extension.columnar.TransformHints

Expand Down Expand Up @@ -53,7 +53,7 @@ object DeltaRewriteTransformerRules {
// If it enables Delta Column Mapping(e.g. nameMapping and idMapping),
// transform the metadata of Delta into Parquet's,
// so that gluten can read Delta File using Parquet Reader.
case p: FileSourceScanExecTransformer
case p: DeltaScanTransformer
if isDeltaColumnMappingFileFormat(p.relation.fileFormat) && notAppliedColumnMappingRule(
p) =>
transformColumnMappingPlan(p)
Expand All @@ -71,7 +71,7 @@ object DeltaRewriteTransformerRules {
* transform the metadata of Delta into Parquet's, each plan should only be transformed once.
*/
private def transformColumnMappingPlan(plan: SparkPlan): SparkPlan = plan match {
case plan: FileSourceScanExecTransformer =>
case plan: DeltaScanTransformer =>
val fmt = plan.relation.fileFormat.asInstanceOf[DeltaParquetFileFormat]
// a mapping between the table schemas name to parquet schemas.
val columnNameMapping = mutable.Map.empty[String, String]
Expand Down Expand Up @@ -136,7 +136,7 @@ object DeltaRewriteTransformerRules {
val newRequiredFields = plan.requiredSchema.map {
e => StructField(columnNameMapping(e.name), e.dataType, e.nullable, e.metadata)
}
val scanExecTransformer = new FileSourceScanExecTransformer(
val scanExecTransformer = new DeltaScanTransformer(
newFsRelation,
newOutput,
StructType(newRequiredFields),
Expand Down

0 comments on commit 77c25f3

Please sign in to comment.