From c274289c704d1ed055770f5b988ba8a4271479a8 Mon Sep 17 00:00:00 2001 From: jeanlyn Date: Mon, 5 Jan 2015 00:26:14 +0800 Subject: [PATCH 1/3] SPARK-5068: fix bug query data when path doesn't exists --- .../apache/spark/sql/hive/TableReader.scala | 6 ++- .../spark/sql/hive/QueryPartitionSuite.scala | 49 +++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index c368715f7c6f5..bce835d5ff17b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -141,7 +141,11 @@ class HadoopTableReader( partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]], filterOpt: Option[PathFilter]): RDD[Row] = { - val hivePartitionRDDs = partitionToDeserializer.map { case (partition, partDeserializer) => + val hivePartitionRDDs = partitionToDeserializer.filter{ case (partition, partDeserializer) => + val partPath = HiveShim.getDataLocationPath(partition) + val fs = partPath.getFileSystem(sc.hiveconf) + fs.exists(partPath) + }.map { case (partition, partDeserializer) => val partDesc = Utilities.getPartitionDesc(partition) val partPath = HiveShim.getDataLocationPath(partition) val inputPathStr = applyFilterIfNeeded(partPath, filterOpt) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala new file mode 100644 index 0000000000000..576a9bfd61b6b --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -0,0 +1,49 @@ +package org.apache.spark.sql.hive + +import java.io.File + +import com.google.common.io.Files +import org.apache.spark.sql.{QueryTest, _} +import org.apache.spark.sql.hive.test.TestHive +/* Implicits */ +import org.apache.spark.sql.hive.test.TestHive._ + + +class QueryPartitionSuite extends QueryTest { + + test("SPARK-5068: query data when path doesn't exists"){ + val testData = TestHive.sparkContext.parallelize( + (1 to 10).map(i => TestData(i, i.toString))) + testData.registerTempTable("testData") + + val tmpDir = Files.createTempDir() + //create the table for test + sql(s"CREATE TABLE table_with_partition(key int,value string) PARTITIONED by (ds string) location '${tmpDir.toURI.toString}' ") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') SELECT key,value FROM testData") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2') SELECT key,value FROM testData") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3') SELECT key,value FROM testData") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4') SELECT key,value FROM testData") + //test for the exist path + checkAnswer(sql("select key,value from table_with_partition"), + testData.collect.toSeq ++ testData.collect.toSeq ++ testData.collect.toSeq ++ testData.collect.toSeq) + + //delect the path of one partition + val folders = tmpDir.listFiles.filter(_.isDirectory).toList + def deleteAll(file:File){ + if(file.isDirectory()){ + for(f:File <-file.listFiles()){ + deleteAll(f); + } + } + file.delete(); + } + deleteAll(folders(0)) + + //test for the affter delete the path + checkAnswer(sql("select key,value from table_with_partition"), + testData.collect.toSeq ++ testData.collect.toSeq ++ testData.collect.toSeq) + + sql("DROP TABLE table_with_partition") + sql("DROP TABLE createAndInsertTest") + } +} From fe4427a4d96d28930ab1d35ee05d400672ccd88e Mon Sep 17 00:00:00 2001 From: jeanlyn Date: Tue, 6 Jan 2015 16:08:36 +0800 Subject: [PATCH 2/3] catch the execption when the path doesn't exists --- .../org/apache/spark/rdd/HadoopRDD.scala | 27 +++++++++---------- .../apache/spark/sql/hive/TableReader.scala | 6 +---- .../spark/sql/hive/QueryPartitionSuite.scala | 21 +++++++++++++-- 3 files changed, 33 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 0001c2329c83a..b296028e7ff9a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -26,15 +26,7 @@ import scala.reflect.ClassTag import scala.collection.mutable.ListBuffer import org.apache.hadoop.conf.{Configurable, Configuration} -import org.apache.hadoop.mapred.FileSplit -import org.apache.hadoop.mapred.InputFormat -import org.apache.hadoop.mapred.InputSplit -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapred.RecordReader -import org.apache.hadoop.mapred.Reporter -import org.apache.hadoop.mapred.JobID -import org.apache.hadoop.mapred.TaskAttemptID -import org.apache.hadoop.mapred.TaskID +import org.apache.hadoop.mapred._ import org.apache.hadoop.util.ReflectionUtils import org.apache.spark._ @@ -198,12 +190,19 @@ class HadoopRDD[K, V]( if (inputFormat.isInstanceOf[Configurable]) { inputFormat.asInstanceOf[Configurable].setConf(jobConf) } - val inputSplits = inputFormat.getSplits(jobConf, minPartitions) - val array = new Array[Partition](inputSplits.size) - for (i <- 0 until inputSplits.size) { - array(i) = new HadoopPartition(id, i, inputSplits(i)) + // SPARK-5068:catch the exception when the path is not exists + try { + val inputSplits = inputFormat.getSplits(jobConf, minPartitions) + val array = new Array[Partition](inputSplits.size) + for (i <- 0 until inputSplits.size) { + array(i) = new HadoopPartition(id, i, inputSplits(i)) + } + array + } catch { + case e: InvalidInputException => { + new Array[Partition](0) + } } - array } override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index bce835d5ff17b..c368715f7c6f5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -141,11 +141,7 @@ class HadoopTableReader( partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]], filterOpt: Option[PathFilter]): RDD[Row] = { - val hivePartitionRDDs = partitionToDeserializer.filter{ case (partition, partDeserializer) => - val partPath = HiveShim.getDataLocationPath(partition) - val fs = partPath.getFileSystem(sc.hiveconf) - fs.exists(partPath) - }.map { case (partition, partDeserializer) => + val hivePartitionRDDs = partitionToDeserializer.map { case (partition, partDeserializer) => val partDesc = Utilities.getPartitionDesc(partition) val partPath = HiveShim.getDataLocationPath(partition) val inputPathStr = applyFilterIfNeeded(partPath, filterOpt) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala index 576a9bfd61b6b..a215b3b430149 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql.hive import java.io.File @@ -30,8 +47,8 @@ class QueryPartitionSuite extends QueryTest { //delect the path of one partition val folders = tmpDir.listFiles.filter(_.isDirectory).toList def deleteAll(file:File){ - if(file.isDirectory()){ - for(f:File <-file.listFiles()){ + if (file.isDirectory()) { + for (f:File <-file.listFiles()) { deleteAll(f); } } From d4a5b9ae3fc029619366d017cacf69d0beec3479 Mon Sep 17 00:00:00 2001 From: jeanlyn Date: Tue, 6 Jan 2015 20:21:51 +0800 Subject: [PATCH 3/3] Add the logWarn when catch the exception. --- .../main/scala/org/apache/spark/rdd/HadoopRDD.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index b296028e7ff9a..767e3fcdf0484 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -26,7 +26,16 @@ import scala.reflect.ClassTag import scala.collection.mutable.ListBuffer import org.apache.hadoop.conf.{Configurable, Configuration} -import org.apache.hadoop.mapred._ +import org.apache.hadoop.mapred.FileSplit +import org.apache.hadoop.mapred.InputFormat +import org.apache.hadoop.mapred.InputSplit +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapred.RecordReader +import org.apache.hadoop.mapred.Reporter +import org.apache.hadoop.mapred.JobID +import org.apache.hadoop.mapred.TaskAttemptID +import org.apache.hadoop.mapred.TaskID +import org.apache.hadoop.mapred.InvalidInputException import org.apache.hadoop.util.ReflectionUtils import org.apache.spark._ @@ -200,6 +209,7 @@ class HadoopRDD[K, V]( array } catch { case e: InvalidInputException => { + logWarning("InvalidInput!", e) new Array[Partition](0) } }