Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.datasources

import scala.collection.mutable

import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.{InputFileNameHolder, RDD}
import org.apache.spark.sql.SQLContext
Expand All @@ -31,7 +33,8 @@ case class PartitionedFile(
partitionValues: InternalRow,
filePath: String,
start: Long,
length: Long) {
length: Long,
locations: Array[String] = Array.empty) {
override def toString: String = {
s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues"
}
Expand Down Expand Up @@ -85,4 +88,23 @@ class FileScanRDD(
}

override protected def getPartitions: Array[Partition] = filePartitions.toArray

override protected def getPreferredLocations(split: Partition): Seq[String] = {
val files = split.asInstanceOf[FilePartition].files

// Computes total number of bytes can be retrieved from each host.
val hostToNumBytes = mutable.HashMap.empty[String, Long]
files.foreach { file =>
file.locations foreach { host =>
hostToNumBytes(host) = hostToNumBytes.getOrElse(host, 0L) + file.length
}
}

// Takes the first 3 hosts with the most data to be retrieved
hostToNumBytes.toSeq.sortBy {
case (host, numBytes) => numBytes
}.reverse.take(3).map {
case (host, numBytes) => host
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources

import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}

import org.apache.spark.internal.Logging
import org.apache.spark.sql._
Expand Down Expand Up @@ -120,7 +120,10 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
logInfo(s"Planning with ${bucketing.numBuckets} buckets")
val bucketed =
selectedPartitions.flatMap { p =>
p.files.map(f => PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen))
p.files.map { f =>
val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen)
PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, hosts)
}
}.groupBy { f =>
BucketingUtils
.getBucketId(new Path(f.filePath).getName)
Expand All @@ -139,10 +142,12 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {

val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
(0L to file.getLen by maxSplitBytes).map { offset =>
val blockLocations = getBlockLocations(file)
(0L until file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
PartitionedFile(partition.values, file.getPath.toUri.toString, offset, size)
val hosts = getBlockHosts(blockLocations, offset, size)
PartitionedFile(partition.values, file.getPath.toUri.toString, offset, size, hosts)
}
}
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
Expand Down Expand Up @@ -207,4 +212,43 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {

case _ => Nil
}

private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match {
case f: LocatedFileStatus => f.getBlockLocations
case f => Array.empty[BlockLocation]
}

// Given locations of all blocks of a single file, `blockLocations`, and an `(offset, length)`
// pair that represents a segment of the same file, find out the block that contains the largest
// fraction the segment, and returns location hosts of that block. If no such block can be found,
// returns an empty array.
private def getBlockHosts(
blockLocations: Array[BlockLocation], offset: Long, length: Long): Array[String] = {
val candidates = blockLocations.map {
// The fragment starts from a position within this block
case b if b.getOffset <= offset && offset < b.getOffset + b.getLength =>
b.getHosts -> (b.getOffset + b.getLength - offset).min(length)

// The fragment ends at a position within this block
case b if offset <= b.getOffset && offset + length < b.getLength =>
b.getHosts -> (offset + length - b.getOffset).min(length)

// The fragment fully contains this block
case b if offset <= b.getOffset && b.getOffset + b.getLength <= offset + length =>
b.getHosts -> b.getLength

// The fragment doesn't intersect with this block
case b =>
b.getHosts -> 0L
}.filter { case (hosts, size) =>
size > 0L
}

if (candidates.isEmpty) {
Array.empty[String]
} else {
val (hosts, _) = candidates.maxBy { case (_, size) => size }
hosts
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ import scala.collection.mutable
import scala.util.Try

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs._
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}

import org.apache.spark.SparkContext
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
Expand All @@ -38,7 +37,6 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.collection.BitSet

/**
* ::DeveloperApi::
Expand Down Expand Up @@ -625,16 +623,31 @@ class HDFSFileCatalog(
if (paths.length >= sqlContext.conf.parallelPartitionDiscoveryThreshold) {
HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sqlContext.sparkContext)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we also need to update the listLeafFiles that is called by listLeafFilesInParallel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also have a test.

} else {
val statuses = paths.flatMap { path =>
val statuses: Seq[FileStatus] = paths.flatMap { path =>
val fs = path.getFileSystem(hadoopConf)
logInfo(s"Listing $path on driver")
// Dummy jobconf to get to the pathFilter defined in configuration
val jobConf = new JobConf(hadoopConf, this.getClass())
val jobConf = new JobConf(hadoopConf, this.getClass)
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
if (pathFilter != null) {
Try(fs.listStatus(path, pathFilter)).getOrElse(Array.empty)
} else {
Try(fs.listStatus(path)).getOrElse(Array.empty)

val statuses = {
val stats = Try(fs.listStatus(path)).getOrElse(Array.empty)
if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats
}

statuses.map {
case f: LocatedFileStatus => f

// NOTE:
//
// - Although S3/S3A/S3N file system can be quite slow for remote file metadata
// operations, calling `getFileBlockLocations` does no harm here since these file system
// implementations don't actually issue RPC for this method.
//
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a
// a big deal since we always use to `listLeafFilesInParallel` when the number of paths
// exceeds threshold.
case f => new LocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
}
}.filterNot { status =>
val name = status.getPath.getName
Expand All @@ -652,7 +665,7 @@ class HDFSFileCatalog(
}
}

def inferPartitioning(schema: Option[StructType]): PartitionSpec = {
def inferPartitioning(schema: Option[StructType]): PartitionSpec = {
// We use leaf dirs containing data files to discover the schema.
val leafDirs = leafDirToChildrenFiles.keys.toSeq
schema match {
Expand Down Expand Up @@ -754,32 +767,39 @@ private[sql] object HadoopFsRelation extends Logging {
Array.empty
} else {
// Dummy jobconf to get to the pathFilter defined in configuration
val jobConf = new JobConf(fs.getConf, this.getClass())
val jobConf = new JobConf(fs.getConf, this.getClass)
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
val statuses =
if (pathFilter != null) {
val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDirectory)
files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
} else {
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
}
statuses.filterNot(status => shouldFilterOut(status.getPath.getName))
val statuses = {
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
val stats = files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats
}
statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
case f: LocatedFileStatus => f
case f => new LocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
}
}
}

// `FileStatus` is Writable but not serializable. What make it worse, somehow it doesn't play
// well with `SerializableWritable`. So there seems to be no way to serialize a `FileStatus`.
// Here we use `FakeFileStatus` to extract key components of a `FileStatus` to serialize it from
// executor side and reconstruct it on driver side.
case class FakeBlockLocation(
names: Array[String],
hosts: Array[String],
offset: Long,
length: Long)

case class FakeFileStatus(
path: String,
length: Long,
isDir: Boolean,
blockReplication: Short,
blockSize: Long,
modificationTime: Long,
accessTime: Long)
accessTime: Long,
blockLocations: Array[FakeBlockLocation])

def listLeafFilesInParallel(
paths: Seq[Path],
Expand All @@ -794,19 +814,40 @@ private[sql] object HadoopFsRelation extends Logging {
val fs = path.getFileSystem(serializableConfiguration.value)
Try(listLeafFiles(fs, fs.getFileStatus(path))).getOrElse(Array.empty)
}.map { status =>
val blockLocations = status match {
case f: LocatedFileStatus =>
f.getBlockLocations.map { loc =>
FakeBlockLocation(
loc.getNames,
loc.getHosts,
loc.getOffset,
loc.getLength)
}

case _ =>
Array.empty[FakeBlockLocation]
}

FakeFileStatus(
status.getPath.toString,
status.getLen,
status.isDirectory,
status.getReplication,
status.getBlockSize,
status.getModificationTime,
status.getAccessTime)
status.getAccessTime,
blockLocations)
}.collect()

val hadoopFakeStatuses = fakeStatuses.map { f =>
new FileStatus(
f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path))
val blockLocations = f.blockLocations.map { loc =>
new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
}
new LocatedFileStatus(
new FileStatus(
f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)),
blockLocations
)
}
mutable.LinkedHashSet(hadoopFakeStatuses: _*)
}
Expand Down
Loading