Skip to content

Commit

Permalink
Make sure getBlockLocations uses offset and length to find the blocks…
Browse files Browse the repository at this point in the history
… on HDFS
  • Loading branch information
harishreedharan committed Oct 25, 2014
1 parent eadde56 commit 5cce16f
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
Expand Up @@ -86,7 +86,8 @@ class HDFSBackedBlockRDD[T: ClassTag](
val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition]
val locations = getBlockIdLocations()
locations.getOrElse(partition.blockId,
HdfsUtils.getBlockLocations(partition.segment.path, hadoopConfiguration)
HdfsUtils.getBlockLocations(partition.segment.path, partition.segment.offset,
partition.segment.length, hadoopConfiguration)
.getOrElse(new Array[String](0)).toSeq)
}
}
Expand Up @@ -52,11 +52,12 @@ private[streaming] object HdfsUtils {
}
}

def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = {
def getBlockLocations(path: String, offset: Long, length: Long, conf: Configuration):
Option[Array[String]] = {
val dfsPath = new Path(path)
val dfs = getFileSystemForPath(dfsPath, conf)
val fileStatus = dfs.getFileStatus(dfsPath)
val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen))
val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, offset, length))
blockLocs.map(_.flatMap(_.getHosts))
}

Expand Down

0 comments on commit 5cce16f

Please sign in to comment.