From 04c443cc039814d0e1f2ab2ec636ad00256d63fc Mon Sep 17 00:00:00 2001 From: lazyman Date: Tue, 17 Mar 2015 10:56:53 +0800 Subject: [PATCH 1/5] SPARK-5068: fix bug when partition path doesn't exists #2 --- .../apache/spark/sql/hive/TableReader.scala | 34 +++++++++- .../spark/sql/hive/QueryPartitionSuite.scala | 64 +++++++++++++++++++ 2 files changed, 97 insertions(+), 1 deletion(-) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index f22c9eaeedc7d..6963e7b6bc6d9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -142,7 +142,39 @@ class HadoopTableReader( partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]], filterOpt: Option[PathFilter]): RDD[Row] = { - val hivePartitionRDDs = partitionToDeserializer.map { case (partition, partDeserializer) => + // SPARK-5068:get FileStatus and do the filtering locally when the path is not exists + + var existPathSet =collection.mutable.Set[String]() + var pathPatternSet = collection.mutable.Set[String]() + + val hivePartitionRDDs = partitionToDeserializer.filter { + case (partition, partDeserializer) => + + def updateExistPathSetByPathPattern(pathPatternStr:String ){ + val pathPattern = new Path(pathPatternStr) + val fs = pathPattern.getFileSystem(sc.hiveconf) + val matchs = fs.globStatus(pathPattern); + matchs.map( fileStatus =>(existPathSet+= fileStatus.getPath.toString)) + } + // convert /demo/data/year/month/day to /demo/data/**/**/**/ + def getPathPatternByPath(parNum:Int,tpath:Path):String = { + var path = tpath + for (i <- (1 to parNum)) { path = path.getParent } + val tails = (1 to parNum).map(_ => "*").mkString("/","/","/") + path.toString + tails + } + + val partPath = HiveShim.getDataLocationPath(partition) + val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size(); + var pathPatternStr = getPathPatternByPath(partNum,partPath) + if(!pathPatternSet.contains(pathPatternStr)){ + pathPatternSet+=pathPatternStr + updateExistPathSetByPathPattern(pathPatternStr) + } + existPathSet.contains(partPath.toString) + + } + .map { case (partition, partDeserializer) => val partDesc = Utilities.getPartitionDesc(partition) val partPath = HiveShim.getDataLocationPath(partition) val inputPathStr = applyFilterIfNeeded(partPath, filterOpt) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala new file mode 100644 index 0000000000000..8b90bfb19fde8 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File + +import com.google.common.io.Files +import org.apache.spark.sql.{QueryTest, _} +import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.util.Utils +/* Implicits */ +import org.apache.spark.sql.hive.test.TestHive._ + + + +class QueryPartitionSuite extends QueryTest { + import org.apache.spark.sql.hive.test.TestHive.implicits._ + + test("SPARK-5068: query data when path doesn't exists"){ + val testData = TestHive.sparkContext.parallelize( + (1 to 10).map(i => TestData(i, i.toString))).toDF() + testData.registerTempTable("testData") + + val tmpDir = Files.createTempDir() + //create the table for test + sql(s"CREATE TABLE table_with_partition(key int,value string) PARTITIONED by (ds string) location '${tmpDir.toURI.toString}' ") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') SELECT key,value FROM testData") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2') SELECT key,value FROM testData") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3') SELECT key,value FROM testData") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4') SELECT key,value FROM testData") + + //test for the exist path + checkAnswer(sql("select key,value from table_with_partition"), + testData.toSchemaRDD.collect ++ testData.toSchemaRDD.collect + ++ testData.toSchemaRDD.collect ++ testData.toSchemaRDD.collect) + + //delect the path of one partition + val folders = tmpDir.listFiles.filter(_.isDirectory) + Utils.deleteRecursively(folders(0)) + + //test for affter delete the path + checkAnswer(sql("select key,value from table_with_partition"), + testData.toSchemaRDD.collect ++ testData.toSchemaRDD.collect + ++ testData.toSchemaRDD.collect) + + sql("DROP TABLE table_with_partition") + sql("DROP TABLE createAndInsertTest") + } +} \ No newline at end of file From 47e00234ef828fe15fd3b716dd1ae3911371e6eb Mon Sep 17 00:00:00 2001 From: lazymam500 Date: Wed, 18 Mar 2015 18:34:06 +0800 Subject: [PATCH 2/5] fix scala style,add config flag,break the chaining --- .../apache/spark/sql/hive/TableReader.scala | 48 +++++++++++-------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 6963e7b6bc6d9..8d6a1b728c9a4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -142,39 +142,45 @@ class HadoopTableReader( partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]], filterOpt: Option[PathFilter]): RDD[Row] = { - // SPARK-5068:get FileStatus and do the filtering locally when the path is not exists - - var existPathSet =collection.mutable.Set[String]() - var pathPatternSet = collection.mutable.Set[String]() - - val hivePartitionRDDs = partitionToDeserializer.filter { - case (partition, partDeserializer) => - - def updateExistPathSetByPathPattern(pathPatternStr:String ){ + + //SPARK-5068:get FileStatus and do the filtering locally when the path is not exists + def verifyPartitionPath( + partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]): + Map[HivePartition, Class[_ <: Deserializer]] = { + if (!sc.getConf("spark.sql.hive.verifyPartitionPath", "true").toBoolean) { + partitionToDeserializer + } else { + var existPathSet = collection.mutable.Set[String]() + var pathPatternSet = collection.mutable.Set[String]() + partitionToDeserializer.filter { + case (partition, partDeserializer) => + def updateExistPathSetByPathPattern(pathPatternStr: String) { val pathPattern = new Path(pathPatternStr) val fs = pathPattern.getFileSystem(sc.hiveconf) - val matchs = fs.globStatus(pathPattern); - matchs.map( fileStatus =>(existPathSet+= fileStatus.getPath.toString)) + val matches = fs.globStatus(pathPattern) + matches.map(fileStatus => existPathSet += fileStatus.getPath.toString) } // convert /demo/data/year/month/day to /demo/data/**/**/**/ - def getPathPatternByPath(parNum:Int,tpath:Path):String = { - var path = tpath - for (i <- (1 to parNum)) { path = path.getParent } - val tails = (1 to parNum).map(_ => "*").mkString("/","/","/") + def getPathPatternByPath(parNum: Int, tempPath: Path): String = { + var path = tempPath + for (i <- (1 to parNum)) path = path.getParent + val tails = (1 to parNum).map(_ => "*").mkString("/", "/", "/") path.toString + tails } val partPath = HiveShim.getDataLocationPath(partition) val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size(); - var pathPatternStr = getPathPatternByPath(partNum,partPath) - if(!pathPatternSet.contains(pathPatternStr)){ - pathPatternSet+=pathPatternStr + var pathPatternStr = getPathPatternByPath(partNum, partPath) + if (!pathPatternSet.contains(pathPatternStr)) { + pathPatternSet += pathPatternStr updateExistPathSetByPathPattern(pathPatternStr) } - existPathSet.contains(partPath.toString) + existPathSet.contains(partPath.toString) + } + } + } - } - .map { case (partition, partDeserializer) => + val hivePartitionRDDs = verifyPartitionPath(partitionToDeserializer).map { val partDesc = Utilities.getPartitionDesc(partition) val partPath = HiveShim.getDataLocationPath(partition) val inputPathStr = applyFilterIfNeeded(partPath, filterOpt) From f23133fde4dac1ebeb18ec120c74677b4d9be607 Mon Sep 17 00:00:00 2001 From: lazymam500 Date: Wed, 18 Mar 2015 19:41:52 +0800 Subject: [PATCH 3/5] bug fix --- .../src/main/scala/org/apache/spark/sql/hive/TableReader.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 8d6a1b728c9a4..b23d7c551f0fd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -180,7 +180,8 @@ class HadoopTableReader( } } - val hivePartitionRDDs = verifyPartitionPath(partitionToDeserializer).map { + val hivePartitionRDDs = verifyPartitionPath(partitionToDeserializer) + .map { case (partition, partDeserializer) => val partDesc = Utilities.getPartitionDesc(partition) val partPath = HiveShim.getDataLocationPath(partition) val inputPathStr = applyFilterIfNeeded(partPath, filterOpt) From e1d638678c5f5b44a201d3d37c70683c033f6ed0 Mon Sep 17 00:00:00 2001 From: lazymam500 Date: Wed, 18 Mar 2015 19:55:09 +0800 Subject: [PATCH 4/5] fix scala style --- .../src/main/scala/org/apache/spark/sql/hive/TableReader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index b23d7c551f0fd..b451713279381 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -143,7 +143,7 @@ class HadoopTableReader( Class[_ <: Deserializer]], filterOpt: Option[PathFilter]): RDD[Row] = { - //SPARK-5068:get FileStatus and do the filtering locally when the path is not exists + // SPARK-5068:get FileStatus and do the filtering locally when the path is not exists def verifyPartitionPath( partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]): Map[HivePartition, Class[_ <: Deserializer]] = { From 5bfcbfdbdce31d640f9bdf6e874b7f916e89e7ce Mon Sep 17 00:00:00 2001 From: lazyman Date: Mon, 6 Apr 2015 23:26:34 +0800 Subject: [PATCH 5/5] move spark.sql.hive.verifyPartitionPath to SQLConf,fix scala style --- sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala | 6 ++++++ .../main/scala/org/apache/spark/sql/hive/TableReader.scala | 6 +++--- .../org/apache/spark/sql/hive/QueryPartitionSuite.scala | 2 +- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 4815620c6fe57..ee641bdfeb2d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -39,6 +39,8 @@ private[spark] object SQLConf { val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown" val PARQUET_USE_DATA_SOURCE_API = "spark.sql.parquet.useDataSourceApi" + val HIVE_VERIFY_PARTITIONPATH = "spark.sql.hive.verifyPartitionPath" + val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord" val BROADCAST_TIMEOUT = "spark.sql.broadcastTimeout" @@ -119,6 +121,10 @@ private[sql] class SQLConf extends Serializable { private[spark] def parquetUseDataSourceApi = getConf(PARQUET_USE_DATA_SOURCE_API, "true").toBoolean + /** When true uses verifyPartitionPath to prune the path which is not exists. */ + private[spark] def verifyPartitionPath = + getConf(HIVE_VERIFY_PARTITIONPATH, "true").toBoolean + /** When true the planner will use the external sort, which may spill to disk. */ private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "false").toBoolean diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index b451713279381..b871c58e545f7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -147,7 +147,7 @@ class HadoopTableReader( def verifyPartitionPath( partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]): Map[HivePartition, Class[_ <: Deserializer]] = { - if (!sc.getConf("spark.sql.hive.verifyPartitionPath", "true").toBoolean) { + if (!sc.conf.verifyPartitionPath) { partitionToDeserializer } else { var existPathSet = collection.mutable.Set[String]() @@ -158,9 +158,9 @@ class HadoopTableReader( val pathPattern = new Path(pathPatternStr) val fs = pathPattern.getFileSystem(sc.hiveconf) val matches = fs.globStatus(pathPattern) - matches.map(fileStatus => existPathSet += fileStatus.getPath.toString) + matches.foreach(fileStatus => existPathSet += fileStatus.getPath.toString) } - // convert /demo/data/year/month/day to /demo/data/**/**/**/ + // convert /demo/data/year/month/day to /demo/data/*/*/*/ def getPathPatternByPath(parNum: Int, tempPath: Path): String = { var path = tempPath for (i <- (1 to parNum)) path = path.getParent diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala index 8b90bfb19fde8..83f97128c5e83 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -61,4 +61,4 @@ class QueryPartitionSuite extends QueryTest { sql("DROP TABLE table_with_partition") sql("DROP TABLE createAndInsertTest") } -} \ No newline at end of file +}