Skip to content

Commit

Permalink
fix scala style,add config flag,break the chaining
Browse files Browse the repository at this point in the history
  • Loading branch information
lazyman500 committed Mar 18, 2015
1 parent 04c443c commit 47e0023
Showing 1 changed file with 27 additions and 21 deletions.
48 changes: 27 additions & 21 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -142,39 +142,45 @@ class HadoopTableReader(
partitionToDeserializer: Map[HivePartition,
Class[_ <: Deserializer]],
filterOpt: Option[PathFilter]): RDD[Row] = {
// SPARK-5068:get FileStatus and do the filtering locally when the path is not exists

var existPathSet =collection.mutable.Set[String]()
var pathPatternSet = collection.mutable.Set[String]()

val hivePartitionRDDs = partitionToDeserializer.filter {
case (partition, partDeserializer) =>

def updateExistPathSetByPathPattern(pathPatternStr:String ){

//SPARK-5068:get FileStatus and do the filtering locally when the path is not exists
def verifyPartitionPath(
partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]):
Map[HivePartition, Class[_ <: Deserializer]] = {
if (!sc.getConf("spark.sql.hive.verifyPartitionPath", "true").toBoolean) {
partitionToDeserializer
} else {
var existPathSet = collection.mutable.Set[String]()
var pathPatternSet = collection.mutable.Set[String]()
partitionToDeserializer.filter {
case (partition, partDeserializer) =>
def updateExistPathSetByPathPattern(pathPatternStr: String) {
val pathPattern = new Path(pathPatternStr)
val fs = pathPattern.getFileSystem(sc.hiveconf)
val matchs = fs.globStatus(pathPattern);
matchs.map( fileStatus =>(existPathSet+= fileStatus.getPath.toString))
val matches = fs.globStatus(pathPattern)
matches.map(fileStatus => existPathSet += fileStatus.getPath.toString)
}
// convert /demo/data/year/month/day to /demo/data/**/**/**/
def getPathPatternByPath(parNum:Int,tpath:Path):String = {
var path = tpath
for (i <- (1 to parNum)) { path = path.getParent }
val tails = (1 to parNum).map(_ => "*").mkString("/","/","/")
def getPathPatternByPath(parNum: Int, tempPath: Path): String = {
var path = tempPath
for (i <- (1 to parNum)) path = path.getParent
val tails = (1 to parNum).map(_ => "*").mkString("/", "/", "/")
path.toString + tails
}

val partPath = HiveShim.getDataLocationPath(partition)
val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size();
var pathPatternStr = getPathPatternByPath(partNum,partPath)
if(!pathPatternSet.contains(pathPatternStr)){
pathPatternSet+=pathPatternStr
var pathPatternStr = getPathPatternByPath(partNum, partPath)
if (!pathPatternSet.contains(pathPatternStr)) {
pathPatternSet += pathPatternStr
updateExistPathSetByPathPattern(pathPatternStr)
}
existPathSet.contains(partPath.toString)
existPathSet.contains(partPath.toString)
}
}
}

}
.map { case (partition, partDeserializer) =>
val hivePartitionRDDs = verifyPartitionPath(partitionToDeserializer).map {
val partDesc = Utilities.getPartitionDesc(partition)
val partPath = HiveShim.getDataLocationPath(partition)
val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)
Expand Down

0 comments on commit 47e0023

Please sign in to comment.