From 0acd3ad929f18706e05b5deba0e5acae8067c7a1 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Mon, 27 Jul 2015 22:42:59 -0700 Subject: [PATCH 1/2] [SPARK-9397] DataFrame should provide an API to find source data files if applicable Certain applications would benefit from being able to inspect DataFrames that are straightforwardly produced by data sources that stem from files, and find out their source data. For example, one might want to display to a user the size of the data underlying a table, or to copy or mutate it. This PR exposes a `sourcePaths` method on DataFrame which attempts to discover the source data in a best-effort manner, by inspecting HadoopFsRelations and JSONRelations. --- .../org/apache/spark/sql/DataFrame.scala | 20 +++++++++++++++++-- .../org/apache/spark/sql/DataFrameSuite.scala | 20 +++++++++++++++++++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 6 +++--- 3 files changed, 41 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 114ab91d10aa0..db1e80c2f61a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -40,8 +40,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, _} import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser} import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD} -import org.apache.spark.sql.execution.datasources.CreateTableUsingAsSelect -import org.apache.spark.sql.json.JacksonGenerator +import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation} +import org.apache.spark.sql.json.{JacksonGenerator, JSONRelation} +import org.apache.spark.sql.sources.HadoopFsRelation import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils @@ -1546,6 +1547,21 @@ class DataFrame private[sql]( } } + /** + * Returns a best-effort snapshot of the files that compose this DataFrame. This method simply + * asks each constituent BaseRelation for its respective files and takes the union of all results. + * Depending on the source relations, this may not find all input paths. Duplicates are removed. + */ + def sourcePaths: Array[String] = { + val paths: Seq[String] = logicalPlan.collect { + case LogicalRelation(fsBasedRelation: HadoopFsRelation) => + fsBasedRelation.paths.toSeq + case LogicalRelation(jsonRelation: JSONRelation) => + jsonRelation.path.toSeq + }.flatten + paths.toSet.toArray + } + //////////////////////////////////////////////////////////////////////////// // for Python API //////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index f67f2c60c0e16..3ae043f4ad418 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -23,7 +23,10 @@ import scala.language.postfixOps import scala.util.Random import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.functions._ +import org.apache.spark.sql.json.JSONRelation +import org.apache.spark.sql.parquet.ParquetRelation import org.apache.spark.sql.types._ import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint, SQLTestUtils} @@ -491,6 +494,23 @@ class DataFrameSuite extends QueryTest with SQLTestUtils { checkAnswer(df.select(df("key")), testData.select('key).collect().toSeq) } + test("sourcePaths") { + val fakeRelation1 = new ParquetRelation(Array("/my/path", "/my/other/path"), + Some(testData.schema), None, Map.empty)(sqlContext) + val df1 = DataFrame(sqlContext, LogicalRelation(fakeRelation1)) + assert(df1.sourcePaths.toSet == fakeRelation1.paths.toSet) + + val fakeRelation2 = new JSONRelation("/json/path", 1, Some(testData.schema), sqlContext) + val df2 = DataFrame(sqlContext, LogicalRelation(fakeRelation2)) + assert(df2.sourcePaths.toSet == fakeRelation2.path.toSet) + + val unionDF = df1.unionAll(df2) + assert(unionDF.sourcePaths.toSet == fakeRelation1.paths.toSet ++ fakeRelation2.path) + + val filtered = df1.filter("false").unionAll(df2.intersect(df2)) + assert(filtered.sourcePaths.toSet == fakeRelation1.paths.toSet ++ fakeRelation2.path) + } + ignore("show") { // This test case is intended ignored, but to make sure it compiles correctly testData.select($"*").show() diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 3180c05445c9f..a8c9b4fa71b99 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -274,9 +274,9 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging - // NOTE: Instead of passing Metastore schema directly to `ParquetRelation2`, we have to + // NOTE: Instead of passing Metastore schema directly to `ParquetRelation`, we have to // serialize the Metastore schema to JSON and pass it as a data source option because of the - // evil case insensitivity issue, which is reconciled within `ParquetRelation2`. + // evil case insensitivity issue, which is reconciled within `ParquetRelation`. val parquetOptions = Map( ParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json, ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString) @@ -290,7 +290,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => None // Cache miss - case logical@LogicalRelation(parquetRelation: ParquetRelation) => + case logical @ LogicalRelation(parquetRelation: ParquetRelation) => // If we have the same paths, same schema, and same partition spec, // we will use the cached Parquet Relation. val useCached = From ff674306ddf60ee1821a25387cd82190c04dfa96 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Tue, 28 Jul 2015 07:44:11 -0700 Subject: [PATCH 2/2] inputFiles --- .../main/scala/org/apache/spark/sql/DataFrame.scala | 8 ++++---- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index db1e80c2f61a5..3ea0f9ed3bddd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1550,16 +1550,16 @@ class DataFrame private[sql]( /** * Returns a best-effort snapshot of the files that compose this DataFrame. This method simply * asks each constituent BaseRelation for its respective files and takes the union of all results. - * Depending on the source relations, this may not find all input paths. Duplicates are removed. + * Depending on the source relations, this may not find all input files. Duplicates are removed. */ - def sourcePaths: Array[String] = { - val paths: Seq[String] = logicalPlan.collect { + def inputFiles: Array[String] = { + val files: Seq[String] = logicalPlan.collect { case LogicalRelation(fsBasedRelation: HadoopFsRelation) => fsBasedRelation.paths.toSeq case LogicalRelation(jsonRelation: JSONRelation) => jsonRelation.path.toSeq }.flatten - paths.toSet.toArray + files.toSet.toArray } //////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 3ae043f4ad418..3151e071b19ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -494,21 +494,21 @@ class DataFrameSuite extends QueryTest with SQLTestUtils { checkAnswer(df.select(df("key")), testData.select('key).collect().toSeq) } - test("sourcePaths") { + test("inputFiles") { val fakeRelation1 = new ParquetRelation(Array("/my/path", "/my/other/path"), Some(testData.schema), None, Map.empty)(sqlContext) val df1 = DataFrame(sqlContext, LogicalRelation(fakeRelation1)) - assert(df1.sourcePaths.toSet == fakeRelation1.paths.toSet) + assert(df1.inputFiles.toSet == fakeRelation1.paths.toSet) val fakeRelation2 = new JSONRelation("/json/path", 1, Some(testData.schema), sqlContext) val df2 = DataFrame(sqlContext, LogicalRelation(fakeRelation2)) - assert(df2.sourcePaths.toSet == fakeRelation2.path.toSet) + assert(df2.inputFiles.toSet == fakeRelation2.path.toSet) val unionDF = df1.unionAll(df2) - assert(unionDF.sourcePaths.toSet == fakeRelation1.paths.toSet ++ fakeRelation2.path) + assert(unionDF.inputFiles.toSet == fakeRelation1.paths.toSet ++ fakeRelation2.path) val filtered = df1.filter("false").unionAll(df2.intersect(df2)) - assert(filtered.sourcePaths.toSet == fakeRelation1.paths.toSet ++ fakeRelation2.path) + assert(filtered.inputFiles.toSet == fakeRelation1.paths.toSet ++ fakeRelation2.path) } ignore("show") {