From 47e00234ef828fe15fd3b716dd1ae3911371e6eb Mon Sep 17 00:00:00 2001 From: lazymam500 Date: Wed, 18 Mar 2015 18:34:06 +0800 Subject: [PATCH] fix scala style,add config flag,break the chaining --- .../apache/spark/sql/hive/TableReader.scala | 48 +++++++++++-------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 6963e7b6bc6d9..8d6a1b728c9a4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -142,39 +142,45 @@ class HadoopTableReader( partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]], filterOpt: Option[PathFilter]): RDD[Row] = { - // SPARK-5068:get FileStatus and do the filtering locally when the path is not exists - - var existPathSet =collection.mutable.Set[String]() - var pathPatternSet = collection.mutable.Set[String]() - - val hivePartitionRDDs = partitionToDeserializer.filter { - case (partition, partDeserializer) => - - def updateExistPathSetByPathPattern(pathPatternStr:String ){ + + //SPARK-5068:get FileStatus and do the filtering locally when the path is not exists + def verifyPartitionPath( + partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]): + Map[HivePartition, Class[_ <: Deserializer]] = { + if (!sc.getConf("spark.sql.hive.verifyPartitionPath", "true").toBoolean) { + partitionToDeserializer + } else { + var existPathSet = collection.mutable.Set[String]() + var pathPatternSet = collection.mutable.Set[String]() + partitionToDeserializer.filter { + case (partition, partDeserializer) => + def updateExistPathSetByPathPattern(pathPatternStr: String) { val pathPattern = new Path(pathPatternStr) val fs = pathPattern.getFileSystem(sc.hiveconf) - val matchs = fs.globStatus(pathPattern); - matchs.map( fileStatus =>(existPathSet+= fileStatus.getPath.toString)) + val matches = fs.globStatus(pathPattern) + matches.map(fileStatus => existPathSet += fileStatus.getPath.toString) } // convert /demo/data/year/month/day to /demo/data/**/**/**/ - def getPathPatternByPath(parNum:Int,tpath:Path):String = { - var path = tpath - for (i <- (1 to parNum)) { path = path.getParent } - val tails = (1 to parNum).map(_ => "*").mkString("/","/","/") + def getPathPatternByPath(parNum: Int, tempPath: Path): String = { + var path = tempPath + for (i <- (1 to parNum)) path = path.getParent + val tails = (1 to parNum).map(_ => "*").mkString("/", "/", "/") path.toString + tails } val partPath = HiveShim.getDataLocationPath(partition) val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size(); - var pathPatternStr = getPathPatternByPath(partNum,partPath) - if(!pathPatternSet.contains(pathPatternStr)){ - pathPatternSet+=pathPatternStr + var pathPatternStr = getPathPatternByPath(partNum, partPath) + if (!pathPatternSet.contains(pathPatternStr)) { + pathPatternSet += pathPatternStr updateExistPathSetByPathPattern(pathPatternStr) } - existPathSet.contains(partPath.toString) + existPathSet.contains(partPath.toString) + } + } + } - } - .map { case (partition, partDeserializer) => + val hivePartitionRDDs = verifyPartitionPath(partitionToDeserializer).map { val partDesc = Utilities.getPartitionDesc(partition) val partPath = HiveShim.getDataLocationPath(partition) val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)