From e2aa8ba30dae54761e0dd0870389a2604cf35efb Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Sun, 8 Nov 2015 14:34:08 -0800 Subject: [PATCH 1/6] [SPARK-11544] sqlContext doesn't use PathFilter --- .../apache/spark/sql/sources/interfaces.scala | 29 ++++++++-- .../datasources/json/JsonSuite.scala | 54 ++++++++++++++++++- 2 files changed, 77 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 5b8841bc154a5..c8e16334eb86c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -21,7 +21,8 @@ import scala.collection.mutable import scala.util.Try import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.fs.{PathFilter, FileStatus, FileSystem, Path} +import org.apache.hadoop.mapred.{JobConf, FileInputFormat} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.{Logging, SparkContext} @@ -441,8 +442,16 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - logInfo(s"Listing $qualified on driver") - Try(fs.listStatus(qualified)).getOrElse(Array.empty) + logInfo(s"Listing $qualified on dr iver") + // Dummy jobconf to get to the pathFilter defined in configuration + val jobConf = new JobConf(hadoopConf, this.getClass()) + val pathFilter = FileInputFormat.getInputPathFilter(jobConf) + if (pathFilter != null) { + Try(fs.listStatus(qualified, pathFilter)).getOrElse(Array.empty) + } + else { + Try(fs.listStatus(qualified)).getOrElse(Array.empty) + } }.filterNot { status => val name = status.getPath.getName name.toLowerCase == "_temporary" || name.startsWith(".") @@ -814,8 +823,18 @@ private[sql] object HadoopFsRelation extends Logging { if (name == "_temporary" || name.startsWith(".")) { Array.empty } else { - val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir) - files ++ dirs.flatMap(dir => listLeafFiles(fs, dir)) + // Dummy jobconf to get to the pathFilter defined in configuration + val jobConf = new JobConf(fs.getConf, this.getClass()) + val pathFilter = FileInputFormat.getInputPathFilter(jobConf) + if (pathFilter != null) { + val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDir) + files ++ dirs.flatMap(dir => listLeafFiles(fs, dir)) + } + else + { + val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDir) + files ++ dirs.flatMap(dir => listLeafFiles(fs, dir)) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 28b8f02bdf87f..52227414d3868 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -17,10 +17,12 @@ package org.apache.spark.sql.execution.datasources.json -import java.io.{File, StringWriter} +import java.io.{FilenameFilter, File, StringWriter} import java.sql.{Date, Timestamp} import com.fasterxml.jackson.core.JsonFactory +import org.apache.commons.io.FileUtils +import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.spark.rdd.RDD import org.scalactic.Tolerance._ @@ -32,6 +34,10 @@ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +class TmpFileFilter extends PathFilter { + override def accept(path : Path): Boolean = !path.getName.endsWith(".tmp") +} + class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { import testImplicits._ @@ -1393,4 +1399,50 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ) } } + test("SPARK-11544 test pathfilter") { + def filterFunc(file: File): Boolean = { + val name = file.getCanonicalFile.getName() + val ret = !(name.startsWith(".") || name.startsWith("_")) + ret.asInstanceOf[Boolean] + } + + val dir = Utils.createTempDir() + dir.delete() + val path = dir.getCanonicalPath + val emps = Seq( + Row(1, "Emp-1"), + Row(2, "Emp-2") + ) + + val empRows = sqlContext.sparkContext.parallelize(emps, 1) + val empSchema = StructType( + Seq( + StructField("id", IntegerType, true), + StructField("name", StringType, true) + ) + ) + val empDF = sqlContext.createDataFrame(empRows, empSchema) + empDF.write.json(path) + val files = dir.listFiles().filter(filterFunc(_)) + files.map(_.renameTo(new File(path + File.separator + "pathmap.tmp"))) + FileUtils.copyFile(new File(path + File.separator + "pathmap.tmp"), + new File(path + File.separator + "data.json")) + + // Both the files should be read and count should be 4 + val empDF2 = sqlContext.read.json(path) + empDF2.registerTempTable("jsonTable1") + checkAnswer( + sql("select count(*) from jsonTable1"), + Row(4) + ) + // After applying the path filter, only one file should be read and the count should be 2 + sqlContext.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", + classOf[TmpFileFilter], classOf[PathFilter]) + val empDF3 = sqlContext.read.json(path) + empDF3.registerTempTable("jsonTable2") + checkAnswer( + sql("select count(*) from jsonTable2"), + Row(2) + ) + } } From fba713a1520e0f44249184c891527e89553f0a65 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Wed, 11 Nov 2015 23:58:23 -0800 Subject: [PATCH 2/6] code review comments --- .../src/main/scala/org/apache/spark/sql/sources/interfaces.scala | 1 - .../apache/spark/sql/execution/datasources/json/JsonSuite.scala | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index c8e16334eb86c..136ba1dd028b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -441,7 +441,6 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - logInfo(s"Listing $qualified on dr iver") // Dummy jobconf to get to the pathFilter defined in configuration val jobConf = new JobConf(hadoopConf, this.getClass()) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 52227414d3868..e8630b6573024 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1399,6 +1399,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ) } } + test("SPARK-11544 test pathfilter") { def filterFunc(file: File): Boolean = { val name = file.getCanonicalFile.getName() From 85d4e3dee1a6c525faf5e5702c051bb02ed41898 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Sun, 15 Nov 2015 22:06:48 -0800 Subject: [PATCH 3/6] Code review comments from Yin --- .../apache/spark/sql/sources/interfaces.scala | 7 +- .../datasources/json/JsonSuite.scala | 83 ++++++++++--------- 2 files changed, 49 insertions(+), 41 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 136ba1dd028b8..ccc8f74e0889e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -441,14 +441,13 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - logInfo(s"Listing $qualified on dr iver") + logInfo(s"Listing $qualified on driver") // Dummy jobconf to get to the pathFilter defined in configuration val jobConf = new JobConf(hadoopConf, this.getClass()) val pathFilter = FileInputFormat.getInputPathFilter(jobConf) if (pathFilter != null) { - Try(fs.listStatus(qualified, pathFilter)).getOrElse(Array.empty) - } - else { + Try(fs.listStatus(qualified, pathFilter)) .getOrElse(Array.empty) + } else { Try(fs.listStatus(qualified)).getOrElse(Array.empty) } }.filterNot { status => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index e8630b6573024..294878f37a720 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -17,18 +17,20 @@ package org.apache.spark.sql.execution.datasources.json -import java.io.{FilenameFilter, File, StringWriter} +import java.io.{File, StringWriter} import java.sql.{Date, Timestamp} +import scala.collection.JavaConverters._ import com.fasterxml.jackson.core.JsonFactory import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, PathFilter} -import org.apache.spark.rdd.RDD import org.scalactic.Tolerance._ +import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource} import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -1407,43 +1409,50 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ret.asInstanceOf[Boolean] } - val dir = Utils.createTempDir() - dir.delete() - val path = dir.getCanonicalPath - val emps = Seq( - Row(1, "Emp-1"), - Row(2, "Emp-2") - ) + val clonedConf = new Configuration(hadoopConfiguration) + try { + val dir = Utils.createTempDir() + dir.delete() + val path = dir.getCanonicalPath + val emps = Seq( + Row(1, "Emp-1"), + Row(2, "Emp-2") + ) - val empRows = sqlContext.sparkContext.parallelize(emps, 1) - val empSchema = StructType( - Seq( - StructField("id", IntegerType, true), - StructField("name", StringType, true) + val empRows = sqlContext.sparkContext.parallelize(emps, 1) + val empSchema = StructType( + Seq( + StructField("id", IntegerType, true), + StructField("name", StringType, true) + ) ) - ) - val empDF = sqlContext.createDataFrame(empRows, empSchema) - empDF.write.json(path) - val files = dir.listFiles().filter(filterFunc(_)) - files.map(_.renameTo(new File(path + File.separator + "pathmap.tmp"))) - FileUtils.copyFile(new File(path + File.separator + "pathmap.tmp"), + val empDF = sqlContext.createDataFrame(empRows, empSchema) + empDF.write.json(path) + val files = dir.listFiles().filter(filterFunc(_)) + files.map(_.renameTo(new File(path + File.separator + "pathmap.tmp"))) + FileUtils.copyFile(new File(path + File.separator + "pathmap.tmp"), new File(path + File.separator + "data.json")) - // Both the files should be read and count should be 4 - val empDF2 = sqlContext.read.json(path) - empDF2.registerTempTable("jsonTable1") - checkAnswer( - sql("select count(*) from jsonTable1"), - Row(4) - ) - // After applying the path filter, only one file should be read and the count should be 2 - sqlContext.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", - classOf[TmpFileFilter], classOf[PathFilter]) - val empDF3 = sqlContext.read.json(path) - empDF3.registerTempTable("jsonTable2") - checkAnswer( - sql("select count(*) from jsonTable2"), - Row(2) - ) + // Both the files should be read and count should be 4 + val empDF2 = sqlContext.read.json(path) + empDF2.registerTempTable("jsonTable1") + checkAnswer( + sql("select count(*) from jsonTable1"), + Row(4) + ) + // After applying the path filter, only one file should be read and the count should be 2 + sqlContext.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", + classOf[TmpFileFilter], classOf[PathFilter]) + val empDF3 = sqlContext.read.json(path) + empDF3.registerTempTable("jsonTable2") + checkAnswer( + sql("select count(*) from jsonTable2"), + Row(2) + ) + } finally { + // Hadoop 1 doesn't have `Configuration.unset` + hadoopConfiguration.clear() + clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) + } } } From e69c5382eca39c0ef9c912268f10f160f41dca52 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Sun, 15 Nov 2015 22:44:09 -0800 Subject: [PATCH 4/6] Fix test failure --- .../main/scala/org/apache/spark/sql/sources/interfaces.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index ccc8f74e0889e..bc3993ebb429d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -830,7 +830,7 @@ private[sql] object HadoopFsRelation extends Logging { } else { - val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDir) + val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir) files ++ dirs.flatMap(dir => listLeafFiles(fs, dir)) } } From 144cdb02224e986e8f0e13e8c0a6ce09140335cf Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 16 Nov 2015 11:39:18 -0800 Subject: [PATCH 5/6] Code review comments from Lian --- .../apache/spark/sql/sources/interfaces.scala | 6 +- .../datasources/json/JsonSuite.scala | 70 ++++++------------- 2 files changed, 22 insertions(+), 54 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index bc3993ebb429d..fb7507c53357d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -446,7 +446,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio val jobConf = new JobConf(hadoopConf, this.getClass()) val pathFilter = FileInputFormat.getInputPathFilter(jobConf) if (pathFilter != null) { - Try(fs.listStatus(qualified, pathFilter)) .getOrElse(Array.empty) + Try(fs.listStatus(qualified, pathFilter)).getOrElse(Array.empty) } else { Try(fs.listStatus(qualified)).getOrElse(Array.empty) } @@ -827,9 +827,7 @@ private[sql] object HadoopFsRelation extends Logging { if (pathFilter != null) { val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDir) files ++ dirs.flatMap(dir => listLeafFiles(fs, dir)) - } - else - { + } else { val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir) files ++ dirs.flatMap(dir => listLeafFiles(fs, dir)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 294878f37a720..92d454a120d23 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -36,8 +36,8 @@ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.util.Utils -class TmpFileFilter extends PathFilter { - override def accept(path : Path): Boolean = !path.getName.endsWith(".tmp") +class TestFileFilter extends PathFilter { + override def accept(path: Path): Boolean = path.getParent.getName != "p=2" } class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { @@ -1403,56 +1403,26 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("SPARK-11544 test pathfilter") { - def filterFunc(file: File): Boolean = { - val name = file.getCanonicalFile.getName() - val ret = !(name.startsWith(".") || name.startsWith("_")) - ret.asInstanceOf[Boolean] - } - - val clonedConf = new Configuration(hadoopConfiguration) - try { - val dir = Utils.createTempDir() - dir.delete() + withTempPath { dir => val path = dir.getCanonicalPath - val emps = Seq( - Row(1, "Emp-1"), - Row(2, "Emp-2") - ) - val empRows = sqlContext.sparkContext.parallelize(emps, 1) - val empSchema = StructType( - Seq( - StructField("id", IntegerType, true), - StructField("name", StringType, true) - ) - ) - val empDF = sqlContext.createDataFrame(empRows, empSchema) - empDF.write.json(path) - val files = dir.listFiles().filter(filterFunc(_)) - files.map(_.renameTo(new File(path + File.separator + "pathmap.tmp"))) - FileUtils.copyFile(new File(path + File.separator + "pathmap.tmp"), - new File(path + File.separator + "data.json")) - - // Both the files should be read and count should be 4 - val empDF2 = sqlContext.read.json(path) - empDF2.registerTempTable("jsonTable1") - checkAnswer( - sql("select count(*) from jsonTable1"), - Row(4) - ) - // After applying the path filter, only one file should be read and the count should be 2 - sqlContext.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", - classOf[TmpFileFilter], classOf[PathFilter]) - val empDF3 = sqlContext.read.json(path) - empDF3.registerTempTable("jsonTable2") - checkAnswer( - sql("select count(*) from jsonTable2"), - Row(2) - ) - } finally { - // Hadoop 1 doesn't have `Configuration.unset` - hadoopConfiguration.clear() - clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) + val df = sqlContext.range(2) + df.write.json(path + "/p=1") + df.write.json(path + "/p=2") + assert(sqlContext.read.json(path).count() === 4) + + val clonedConf = new Configuration(hadoopConfiguration) + try { + hadoopConfiguration.setClass( + "mapreduce.input.pathFilter.class", + classOf[TestFileFilter], + classOf[PathFilter]) + assert(sqlContext.read.json(path).count() === 2) + } finally { + // Hadoop 1 doesn't have `Configuration.unset` + hadoopConfiguration.clear() + clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) + } } } } From f6df5c8a333da1ad8029c6b018dc52db4c2c488a Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Wed, 18 Nov 2015 19:28:49 -0800 Subject: [PATCH 6/6] Fix test failures against older version of hadoop --- .../spark/sql/execution/datasources/json/JsonSuite.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 92d454a120d23..47421d5d18c36 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1413,6 +1413,11 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val clonedConf = new Configuration(hadoopConfiguration) try { + // Setting it twice as the name of the propery has changed between hadoop versions. + hadoopConfiguration.setClass( + "mapred.input.pathFilter.class", + classOf[TestFileFilter], + classOf[PathFilter]) hadoopConfiguration.setClass( "mapreduce.input.pathFilter.class", classOf[TestFileFilter],