Skip to content

Commit

Permalink
[SPARK-41970][SQL][FOLLOWUP] Revert SparkPath changes to FileIndex an…
Browse files Browse the repository at this point in the history
…d FileRelation

### What changes were proposed in this pull request?
This PR reverts the `SparkPath`changes to `FileIndex` and `FileRelation` because they provided little benefit to Open Source Spark, but are widely used extension points for other open source projects. For the 3.4.0 release we want to preserve this type of binary compatibility.

That said, we reserve the right to make this change for Spark 4.0.

### Why are the changes needed?
Revert `inputFiles: Array[SparkPath]` back to `inputFiles: Array[String]`, with an explicit comment that the strings are expected to be url-encoded.

### Does this PR introduce _any_ user-facing change?
This is to revert an internal interface change.

### How was this patch tested?
Existing unit tests.

Closes #39808 from databricks-david-lewis/SPARK_PATH_FOLLOWUP.

Authored-by: David Lewis <david.lewis@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 3887e71)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
databricks-david-lewis authored and HyukjinKwon committed Jan 31, 2023
1 parent 42e183f commit b553b0e
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.spark.sql.avro

import java.io._
import java.net.URI

import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.mapred.FsInput
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkConf
import org.apache.spark.sql._
Expand Down Expand Up @@ -60,8 +62,10 @@ class AvroRowReaderSuite
case BatchScanExec(_, f: AvroScan, _, _, _, _, _) => f
}
val filePath = fileScan.get.fileIndex.inputFiles(0)
val fileSize = new File(filePath.toUri).length
val in = new FsInput(filePath.toPath, new Configuration())
val fileSize = new File(new URI(filePath)).length
// scalastyle:off pathfromuri
val in = new FsInput(new Path(new URI(filePath)), new Configuration())
// scalastyle:on pathfromuri
val reader = DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]())

val it = new Iterator[InternalRow] with AvroUtils.RowReader {
Expand Down
7 changes: 3 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.spark.api.java.function._
import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
import org.apache.spark.api.r.RRDD
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.paths.SparkPath
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, QueryPlanningTracker, ScalaReflection, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
Expand Down Expand Up @@ -3925,18 +3924,18 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
def inputFiles: Array[String] = {
val files: Seq[SparkPath] = queryExecution.optimizedPlan.collect {
val files: Seq[String] = queryExecution.optimizedPlan.collect {
case LogicalRelation(fsBasedRelation: FileRelation, _, _, _) =>
fsBasedRelation.inputFiles
case fr: FileRelation =>
fr.inputFiles
case r: HiveTableRelation =>
r.tableMeta.storage.locationUri.map(SparkPath.fromUri).toArray
r.tableMeta.storage.locationUri.map(_.toString).toArray
case DataSourceV2ScanRelation(DataSourceV2Relation(table: FileTable, _, _, _, _),
_, _, _, _) =>
table.fileIndex.inputFiles
}.flatten
files.iterator.map(_.urlEncoded).toSet.toArray
files.toSet.toArray
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.spark.sql.execution

import org.apache.spark.paths.SparkPath

/**
* An interface for relations that are backed by files. When a class implements this interface,
* the list of paths that it returns will be returned to a user who calls `inputPaths` on any
* DataFrame that queries this relation.
*/
trait FileRelation {
/** Returns the list of files that will be read when scanning this relation. */
def inputFiles: Array[SparkPath]
/**
* Returns the list of files that will be read when scanning this relation.
* The strings returned are expected to be url-encoded paths.
*/
def inputFiles: Array[String]
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, ExternalCatalogUtils}
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -95,7 +94,7 @@ class CatalogFileIndex(
}
}

override def inputFiles: Array[SparkPath] = filterPartitions(Nil).inputFiles
override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles

// `CatalogFileIndex` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member
// of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. So we need to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources

import org.apache.hadoop.fs._

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -62,8 +61,9 @@ trait FileIndex {
/**
* Returns the list of files that will be read when scanning this relation. This call may be
* very expensive for large tables.
* The strings returned are expected to be url-encoded paths.
*/
def inputFiles: Array[SparkPath]
def inputFiles: Array[String]

/** Refresh any cached file listings */
def refresh(): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.execution.datasources

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.execution.FileRelation
Expand Down Expand Up @@ -71,5 +70,5 @@ case class HadoopFsRelation(
}


override def inputFiles: Array[SparkPath] = location.inputFiles
override def inputFiles: Array[String] = location.inputFiles
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ abstract class PartitioningAwareFileIndex(
}

/** Returns the list of files that will be read when scanning this relation. */
override def inputFiles: Array[SparkPath] =
allFiles().map(SparkPath.fromFileStatus).toArray
override def inputFiles: Array[String] =
allFiles().map(fs => SparkPath.fromFileStatus(fs).urlEncoded).toArray

override def sizeInBytes: Long = allFiles().map(_.getLen).sum

Expand Down

0 comments on commit b553b0e

Please sign in to comment.