From d2b4b26823cfdad36d96c28fafc451815ad4614d Mon Sep 17 00:00:00 2001 From: xin Wu Date: Fri, 6 Nov 2015 15:47:14 -0800 Subject: [PATCH 01/10] SPARK-11522 input_file_name() returns empty string for external tables --- .../org/apache/spark/rdd/HadoopRDD.scala | 6 + .../sql/hive/execution/HiveUDFSuite.scala | 134 ++++++++++++++++++ 2 files changed, 140 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 0453614f6a1d3..79f7611c60cae 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -213,6 +213,12 @@ class HadoopRDD[K, V]( val inputMetrics = context.taskMetrics.getInputMetricsForReadMethod(DataReadMethod.Hadoop) + // Sets the thread local variable for the file's name + split.inputSplit.value match { + case fs: FileSplit => SqlNewHadoopRDD.setInputFileName(fs.getPath.toString) + case _ => SqlNewHadoopRDD.unsetInputFileName() + } + // Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes val bytesReadCallback = inputMetrics.bytesReadCallback.orElse { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 5ab477efc4ee0..ece26fbe9ecb7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -348,6 +348,140 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton { sqlContext.dropTempTable("testUDF") } + + + test("SPARK-11522 select input_file_name from non-parquet table"){ + + println("======================================") + println("EXTERNAL OpenCSVSerde table pointing to LOCATION") + + sql("CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT) " + + " ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'" + + " WITH SERDEPROPERTIES ( " + + " \"separatorChar\" = \",\"," + + " \"quoteChar\" = \"\\\"\"," + + " \"escapeChar\" = \"\\\\\") " + + " LOCATION 'file:///home/xwu0226/spark-tables/csv_table' ") + + sql("SELECT input_file_name() as file FROM csv_table").show(false) + sql("SELECT * FROM csv_table").show() + sql("DROP TABLE csv_table") + + println("======================================") + println("EXTERNAL pointing to LOCATION") + sql("select 1, 2").write.save("file:///home/xwu0226/spark-tables/external_test_t5") + + sql("CREATE EXTERNAL table external_t5 (c1 int, c2 int) " + + " row format delimited fields terminated by ',' " + + " location 'file:///home/xwu0226/spark-tables/external_test_t5'") + + sql("SELECT input_file_name() as file FROM external_t5").show(false) + sql("SELECT * FROM external_t5").show + sql("DROP TABLE external_t5") + + println("======================================") + println("NON-External pointing to LOCATION") + + sql("select 1, 2").write.save("file:///home/xwu0226/spark-tables/external_test_t4") + sql("CREATE table external_t4 (c1 int, c2 int) " + + " row format delimited fields terminated by ',' " + + " location 'file:///home/xwu0226/spark-tables/external_test_t4'") + + sql("SELECT input_file_name() as file FROM external_t4").show(false) + sql("SELECT * FROM external_t4").show + sql("DROP TABLE external_t4") + + println("======================================") + println("NON-External pointing to /tmp/...") + + sql("CREATE table internal_t1(c1 int, c2 int) " + + " row format delimited fields terminated by ',' " + + " as select 1, 2") + + sql("SELECT input_file_name() as file FROM internal_t1").show(false) + sql("SELECT * FROM internal_t1").show + sql("DROP TABLE internal_t1") + + println("======================================") + println("External pointing to /tmp/...") + + sql("CREATE EXTERNAL table external_t1(c1 int, c2 int) " + + " row format delimited fields terminated by ',' " + + " as select 1, 2") + + sql("SELECT input_file_name() as file FROM external_t1").show(false) + sql("SELECT * FROM external_t1").show + sql("DROP TABLE external_t1") + + println("======================================") + println("Non-External parquet pointing to /tmp/...") + + sql("CREATE table internal_parquet_tmp(c1 int, c2 int) " + + " stored as parquet " + + " as select 1, 2") + + sql("SELECT input_file_name() as file FROM internal_parquet_tmp").show(false) + sql("SELECT * FROM internal_parquet_tmp").show + sql("DROP TABLE internal_parquet_tmp") + + println("======================================") + println("External parquet pointing to /tmp/...") + + sql("CREATE EXTERNAL table external_parquet_tmp(c1 int, c2 int) " + + " stored as parquet " + + " as select 1, 2") + + sql("SELECT input_file_name() as file FROM external_parquet_tmp").show(false) + sql("SELECT * FROM external_parquet_tmp").show + sql("DROP TABLE external_parquet_tmp") + + println("======================================") + println("External parquet pointing to LOCATION") + + sql("CREATE table internal_parquet(c1 int, c2 int) " + + " stored as parquet " + + " as select 1, 2") + + val df = sql("SELECT * FROM internal_parquet") + df.write.parquet("file:///home/xwu0226/spark-tables/external_parquet") + + sql("CREATE EXTERNAL table external_parquet(c1 int, c2 int) " + + " stored as parquet " + + " LOCATION 'file:///home/xwu0226/spark-tables/external_parquet'") + + sql("SELECT input_file_name() as file FROM external_parquet").show(false) + sql("SELECT * FROM external_parquet").show + sql("DROP TABLE external_parquet") + sql("DROP TABLE internal_parquet") + + println("======================================") + println("Non-External parquet pointing to /tmp/...") + + sql("CREATE table internal_parquet_tmp(c1 int, c2 int) " + + " stored as parquet " + + " as select 1, 2") + + sql("SELECT input_file_name() as file FROM internal_parquet_tmp").show(false) + sql("SELECT * FROM internal_parquet_tmp").show + sql("DROP TABLE internal_parquet_tmp") + + + println("======================================") + println("Non-External parquet pointing to LOCATION") + + sql("SELECT 1, 2 ").write.parquet("file:///home/xwu0226/spark-tables/NON_external_parquet") + + sql("CREATE table non_external_parquet(c1 int, c2 int) " + + " stored as parquet " + + " LOCATION 'file:///home/xwu0226/spark-tables/NON_external_parquet'") + + sql("SELECT input_file_name() as file FROM non_external_parquet").show(false) + sql("SELECT * FROM non_external_parquet").show + sql("DROP TABLE non_external_parquet") + + } + + } class TestPair(x: Int, y: Int) extends Writable with Serializable { From aafff4a40183272cea99971d9a5f8029e2bddd8d Mon Sep 17 00:00:00 2001 From: xin Wu Date: Fri, 6 Nov 2015 18:56:06 -0800 Subject: [PATCH 02/10] SPARK-11522 updating testcases --- .../resources/data/files/csv_table/data1.csv | 1 + .../resources/data/files/csv_table/data2.csv | 1 + .../data/files/external_parquet/_SUCCESS | 0 .../files/external_parquet/_common_metadata | Bin 0 -> 320 bytes .../data/files/external_parquet/_metadata | Bin 0 -> 542 bytes ...ae2-f8ef-4c73-9cae-399570a999a5.gz.parquet | Bin 0 -> 540 bytes .../resources/data/files/external_t5/data.txt | 1 + .../sql/hive/execution/HiveUDFSuite.scala | 142 ++++++------------ 8 files changed, 47 insertions(+), 98 deletions(-) create mode 100644 sql/hive/src/test/resources/data/files/csv_table/data1.csv create mode 100644 sql/hive/src/test/resources/data/files/csv_table/data2.csv create mode 100644 sql/hive/src/test/resources/data/files/external_parquet/_SUCCESS create mode 100644 sql/hive/src/test/resources/data/files/external_parquet/_common_metadata create mode 100644 sql/hive/src/test/resources/data/files/external_parquet/_metadata create mode 100644 sql/hive/src/test/resources/data/files/external_parquet/part-r-00000-12ba2ae2-f8ef-4c73-9cae-399570a999a5.gz.parquet create mode 100644 sql/hive/src/test/resources/data/files/external_t5/data.txt diff --git a/sql/hive/src/test/resources/data/files/csv_table/data1.csv b/sql/hive/src/test/resources/data/files/csv_table/data1.csv new file mode 100644 index 0000000000000..0e88c499a701c --- /dev/null +++ b/sql/hive/src/test/resources/data/files/csv_table/data1.csv @@ -0,0 +1 @@ +1,2 \ No newline at end of file diff --git a/sql/hive/src/test/resources/data/files/csv_table/data2.csv b/sql/hive/src/test/resources/data/files/csv_table/data2.csv new file mode 100644 index 0000000000000..7208c2182900c --- /dev/null +++ b/sql/hive/src/test/resources/data/files/csv_table/data2.csv @@ -0,0 +1 @@ +2.4 \ No newline at end of file diff --git a/sql/hive/src/test/resources/data/files/external_parquet/_SUCCESS b/sql/hive/src/test/resources/data/files/external_parquet/_SUCCESS new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/data/files/external_parquet/_common_metadata b/sql/hive/src/test/resources/data/files/external_parquet/_common_metadata new file mode 100644 index 0000000000000000000000000000000000000000..60483460a89acd5cadcfdb2970a83593b7b2596b GIT binary patch literal 320 zcmb7=K}*9x5QVqVW3Q0}Jd_2&LR=cNi6MIN=CODcDbw9a4Q|qOcY{dC|M2HJN)@l3 zW_a_ydBd#lAJZ_1?;hr^2m88l>IGW}PgxjT1kuSxt$w?dow0ZxpGV8R-3jP{tm2yz z?qCGrhXK8?`!~^|2Mr#gkJ-rmdyl;0&f7tGp76RsqaClFM&3dDHKG*KkG`h!xI;@q zHy8t3Lk>bQ;Xhk>HBM9Vhfe=ZKTj;WJzjj(+QsE|XpCND*Gd+J%u1OflyI}kRhDas Ws;pI!OGULV(?TzqoH4eh-+cn#9APB@ literal 0 HcmV?d00001 diff --git a/sql/hive/src/test/resources/data/files/external_parquet/_metadata b/sql/hive/src/test/resources/data/files/external_parquet/_metadata new file mode 100644 index 0000000000000000000000000000000000000000..ffdf6760bef965dafad5290abd137936fda38b5d GIT binary patch literal 542 zcmbu7O-sW-5QaCgNDrQbED7{b78DEFknbk77jGVm2QMOJy4loX(vQssv89*(3V(z@ z(rs*u9>qP(4zu&lGra8R?!hy3Yj}TAEkV4FEB3-mFj~mauXOWl%-mDvA*@>+%QlCq zQ&K_Q{h^*e0Ux+e^MvO#Wc`4q3^)zaH0ryMrYS_ubnPh7s^${6JBk;f_PN!JH(FT7 zk9d9E9-1u;A@m)w%FsVaJM9{@IKbII=r@? zY@}k{(9Mcs5UjgMDd|fCkyWf-sP`46wZUAONk7c?+B}oM{R>ikkyMOyL#vLGzQx82 zV5}vVl};iH04KS~tYma+#Uf(?%b4~o6*RjW3)-YF?T%A jsr!yAn9mwDm)G6E5!?^C=Q + sql("SELECT 1, 2 ").write.parquet(dir.getCanonicalPath) - sql("SELECT 1, 2 ").write.parquet("file:///home/xwu0226/spark-tables/NON_external_parquet") - - sql("CREATE table non_external_parquet(c1 int, c2 int) " + - " stored as parquet " + - " LOCATION 'file:///home/xwu0226/spark-tables/NON_external_parquet'") + sql(s"""CREATE table non_external_parquet(c1 int, c2 int) + stored as parquet + LOCATION '${dir.getCanonicalPath}'""") - sql("SELECT input_file_name() as file FROM non_external_parquet").show(false) - sql("SELECT * FROM non_external_parquet").show - sql("DROP TABLE non_external_parquet") + val answer5 = sql("SELECT input_file_name() as file FROM non_external_parquet").head().getString(0) + assert(answer5.contains(dir.getCanonicalPath)) + assert(sql("SELECT input_file_name() as file FROM non_external_parquet").distinct().collect().length == 1) + sql("DROP TABLE non_external_parquet") + } } From 14bcd1d3d5a594491337f753eaab0832edd1d3d9 Mon Sep 17 00:00:00 2001 From: xin Wu Date: Fri, 6 Nov 2015 21:06:14 -0800 Subject: [PATCH 03/10] SPARK-11522 update testcase --- .../org/apache/spark/sql/hive/execution/HiveUDFSuite.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index a2565a9cbb97c..31e21c0a3c998 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -28,7 +28,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory} import org.apache.hadoop.hive.serde2.{AbstractSerDe, SerDeStats} import org.apache.hadoop.io.Writable -import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.util.Utils @@ -45,7 +44,7 @@ case class ListStringCaseClass(l: Seq[String]) /** * A test suite for Hive custom UDFs. */ -class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils{ +class HiveUDFSuite extends QueryTest with TestHiveSingleton { import hiveContext.{udf, sql} import hiveContext.implicits._ @@ -409,7 +408,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils{ assert(sql("SELECT input_file_name() as file FROM internal_parquet_tmp").distinct().collect().length == 1) sql("DROP TABLE internal_parquet_tmp") - + /* println("======================================") println("Non-External parquet pointing to LOCATION") withTempPath { dir => @@ -424,7 +423,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils{ assert(sql("SELECT input_file_name() as file FROM non_external_parquet").distinct().collect().length == 1) sql("DROP TABLE non_external_parquet") } - + */ } From d74dfd8c5d73389d169ada3896d75ad65da8df04 Mon Sep 17 00:00:00 2001 From: xin Wu Date: Sat, 7 Nov 2015 09:09:24 -0800 Subject: [PATCH 04/10] SPARK-11522 update testcase --- .../spark/sql/hive/execution/HiveUDFSuite.scala | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 31e21c0a3c998..11a2c639e0777 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -408,22 +408,6 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton { assert(sql("SELECT input_file_name() as file FROM internal_parquet_tmp").distinct().collect().length == 1) sql("DROP TABLE internal_parquet_tmp") - /* - println("======================================") - println("Non-External parquet pointing to LOCATION") - withTempPath { dir => - sql("SELECT 1, 2 ").write.parquet(dir.getCanonicalPath) - - sql(s"""CREATE table non_external_parquet(c1 int, c2 int) - stored as parquet - LOCATION '${dir.getCanonicalPath}'""") - - val answer5 = sql("SELECT input_file_name() as file FROM non_external_parquet").head().getString(0) - assert(answer5.contains(dir.getCanonicalPath)) - assert(sql("SELECT input_file_name() as file FROM non_external_parquet").distinct().collect().length == 1) - sql("DROP TABLE non_external_parquet") - } - */ } From ee6bf1cf6a8ae49e91221309f8ae9aa9839bffd7 Mon Sep 17 00:00:00 2001 From: xin Wu Date: Sat, 7 Nov 2015 09:12:34 -0800 Subject: [PATCH 05/10] SPARK-11522 update testcase --- .../org/apache/spark/sql/hive/execution/HiveUDFSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 11a2c639e0777..1bd36216ebd3f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -80,7 +80,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton { """. stripMargin.format(classOf[PairSerDe].getName)) - val location= Utils.getSparkClassLoader.getResource("data/files/testUDF").getFile + val location = Utils.getSparkClassLoader.getResource("data/files/testUDF").getFile sql(s""" ALTER TABLE hiveUDFTestTable ADD IF NOT EXISTS PARTITION(partition='testUDF') From e0066c002d38fd06d31b12c5ee4732ba61341d0e Mon Sep 17 00:00:00 2001 From: xin Wu Date: Sun, 8 Nov 2015 11:26:09 -0800 Subject: [PATCH 06/10] SPARK-11522 update testcase to resolve scala code style check failure --- .../sql/hive/execution/HiveUDFSuite.scala | 31 +++++++++---------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 1bd36216ebd3f..d11b7f4c8933f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -349,11 +349,9 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton { sqlContext.dropTempTable("testUDF") } - test("SPARK-11522 select input_file_name from non-parquet table"){ - println("======================================") - println("EXTERNAL OpenCSVSerde table pointing to LOCATION") + // EXTERNAL OpenCSVSerde table pointing to LOCATION val location1 = Utils.getSparkClassLoader.getResource("data/files/csv_table").getFile sql(s"""CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT) @@ -369,9 +367,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton { assert(sql("select input_file_name() from csv_table").distinct().collect().length == 2) sql("DROP TABLE csv_table") - - println("======================================") - println("EXTERNAL pointing to LOCATION") + // EXTERNAL pointing to LOCATION val location2 = Utils.getSparkClassLoader.getResource("data/files/external_t5").getFile sql(s"""CREATE EXTERNAL table external_t5 (c1 int, c2 int) @@ -380,37 +376,38 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton { val answer2 = sql("SELECT input_file_name() as file FROM external_t5").head().getString(0) assert(answer2.contains("external_t5")) - assert(sql("SELECT input_file_name() as file FROM external_t5").distinct().collect().length == 1) + assert(sql("SELECT input_file_name() as file FROM external_t5") + .distinct().collect().length == 1) sql("DROP TABLE external_t5") - println("======================================") - println("External parquet pointing to LOCATION") + // External parquet pointing to LOCATION val location3 = Utils.getSparkClassLoader.getResource("data/files/external_parquet").getFile sql(s"""CREATE EXTERNAL table external_parquet(c1 int, c2 int) stored as parquet LOCATION '$location3'""") - val answer3 = sql("SELECT input_file_name() as file FROM external_parquet").head().getString(0) + val answer3 = sql("SELECT input_file_name() as file FROM external_parquet") + .head().getString(0) assert(answer3.contains("external_parquet")) - assert(sql("SELECT input_file_name() as file FROM external_parquet").distinct().collect().length == 1) + assert(sql("SELECT input_file_name() as file FROM external_parquet") + .distinct().collect().length == 1) sql("DROP TABLE external_parquet") - println("======================================") - println("Non-External parquet pointing to /tmp/...") + // Non-External parquet pointing to /tmp/... sql("CREATE table internal_parquet_tmp(c1 int, c2 int) " + " stored as parquet " + " as select 1, 2") - val answer4 = sql("SELECT input_file_name() as file FROM internal_parquet_tmp").head().getString(0) + val answer4 = sql("SELECT input_file_name() as file FROM internal_parquet_tmp") + .head().getString(0) assert(answer4.contains("internal_parquet_tmp")) - assert(sql("SELECT input_file_name() as file FROM internal_parquet_tmp").distinct().collect().length == 1) + assert(sql("SELECT input_file_name() as file FROM internal_parquet_tmp") + .distinct().collect().length == 1) sql("DROP TABLE internal_parquet_tmp") - } - } class TestPair(x: Int, y: Int) extends Writable with Serializable { From 1a0bb3a760ed08aa818e199eb0d69744b38663c5 Mon Sep 17 00:00:00 2001 From: xin Wu Date: Fri, 13 Nov 2015 16:55:15 -0800 Subject: [PATCH 07/10] SPARK-11522 implement review comment --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 79f7611c60cae..7db583468792e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -256,6 +256,7 @@ class HadoopRDD[K, V]( override def close() { if (reader != null) { + SqlNewHadoopRDD.unsetInputFileName() // Close the reader and release it. Note: it's very important that we don't close the // reader more than once, since that exposes us to MAPREDUCE-5918 when running against // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic From f909f6e799e8465bff56c22ccf8667021f10cf16 Mon Sep 17 00:00:00 2001 From: xin Wu Date: Sun, 15 Nov 2015 12:16:14 -0800 Subject: [PATCH 08/10] SPARK-11522 update testcase to implement remove comments --- .../sql/hive/execution/HiveUDFSuite.scala | 78 +++++++++++-------- 1 file changed, 45 insertions(+), 33 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index d11b7f4c8933f..538c3b7e334e4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -354,60 +354,72 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton { // EXTERNAL OpenCSVSerde table pointing to LOCATION val location1 = Utils.getSparkClassLoader.getResource("data/files/csv_table").getFile - sql(s"""CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT) - ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' - WITH SERDEPROPERTIES ( - \"separatorChar\" = \",\", - \"quoteChar\" = \"\\\"\", - \"escapeChar\" = \"\\\\\") - LOCATION '$location1'""") - - val answer1 = sql("select input_file_name() from csv_table").head().getString(0) + sql( + s"""CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT) + ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' + WITH SERDEPROPERTIES ( + \"separatorChar\" = \",\", + \"quoteChar\" = \"\\\"\", + \"escapeChar\" = \"\\\\\") + LOCATION '$location1' + """) + + val answer1 = + sql("SELECT input_file_name() FROM csv_table").head().getString(0) assert(answer1.contains(location1)) - assert(sql("select input_file_name() from csv_table").distinct().collect().length == 2) + + val count1 = sql("SELECT input_file_name() FROM csv_table").distinct().count() + assert(count1 == 2) sql("DROP TABLE csv_table") // EXTERNAL pointing to LOCATION val location2 = Utils.getSparkClassLoader.getResource("data/files/external_t5").getFile - sql(s"""CREATE EXTERNAL table external_t5 (c1 int, c2 int) - row format delimited fields terminated by ',' - location '$location2'""") + sql( + s"""CREATE EXTERNAL TABLE external_t5 (c1 int, c2 int) + ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' + LOCATION '$location2' + """) - val answer2 = sql("SELECT input_file_name() as file FROM external_t5").head().getString(0) + val answer2 = + sql("SELECT input_file_name() as file FROM external_t5").head().getString(0) assert(answer2.contains("external_t5")) - assert(sql("SELECT input_file_name() as file FROM external_t5") - .distinct().collect().length == 1) + + val count2 = sql("SELECT input_file_name() as file FROM external_t5").distinct().count + assert(count2 == 1) sql("DROP TABLE external_t5") - // External parquet pointing to LOCATION + // External parquet pointing to LOCATION val location3 = Utils.getSparkClassLoader.getResource("data/files/external_parquet").getFile - sql(s"""CREATE EXTERNAL table external_parquet(c1 int, c2 int) + sql( + s"""CREATE EXTERNAL TABLE external_parquet(c1 int, c2 int) stored as parquet - LOCATION '$location3'""") + LOCATION '$location3' + """) - val answer3 = sql("SELECT input_file_name() as file FROM external_parquet") - .head().getString(0) + val answer3 = + sql("SELECT input_file_name() as file FROM external_parquet").head().getString(0) assert(answer3.contains("external_parquet")) - assert(sql("SELECT input_file_name() as file FROM external_parquet") - .distinct().collect().length == 1) + + val count3 = sql("SELECT input_file_name() as file FROM external_parquet").distinct().count + assert(count3 == 1) sql("DROP TABLE external_parquet") // Non-External parquet pointing to /tmp/... - sql("CREATE table internal_parquet_tmp(c1 int, c2 int) " + - " stored as parquet " + - " as select 1, 2") + sql("CREATE TABLE parquet_tmp(c1 int, c2 int) " + + " STORED AS parquet " + + " AS SELECT 1, 2") - val answer4 = sql("SELECT input_file_name() as file FROM internal_parquet_tmp") - .head().getString(0) - assert(answer4.contains("internal_parquet_tmp")) - assert(sql("SELECT input_file_name() as file FROM internal_parquet_tmp") - .distinct().collect().length == 1) - sql("DROP TABLE internal_parquet_tmp") - } + val answer4 = + sql("SELECT input_file_name() as file FROM parquet_tmp").head().getString(0) + assert(answer4.contains("parquet_tmp")) + val count4 = sql("SELECT input_file_name() as file FROM parquet_tmp").distinct().count + assert(count4 == 1) + sql("DROP TABLE parquet_tmp") + } } class TestPair(x: Int, y: Int) extends Writable with Serializable { From 12c4f755f867fd3efdea0a8361a59d8df1125c3d Mon Sep 17 00:00:00 2001 From: xin Wu Date: Sun, 15 Nov 2015 17:22:40 -0800 Subject: [PATCH 09/10] SPARK-11522 remove added data files and update testcase --- .../resources/data/files/csv_table/data1.csv | 1 - .../resources/data/files/csv_table/data2.csv | 1 - .../data/files/external_parquet/_SUCCESS | 0 .../files/external_parquet/_common_metadata | Bin 320 -> 0 bytes .../data/files/external_parquet/_metadata | Bin 542 -> 0 bytes ...ae2-f8ef-4c73-9cae-399570a999a5.gz.parquet | Bin 540 -> 0 bytes .../resources/data/files/external_t5/data.txt | 1 - .../sql/hive/execution/HiveUDFSuite.scala | 89 +++++++++++------- 8 files changed, 53 insertions(+), 39 deletions(-) delete mode 100644 sql/hive/src/test/resources/data/files/csv_table/data1.csv delete mode 100644 sql/hive/src/test/resources/data/files/csv_table/data2.csv delete mode 100644 sql/hive/src/test/resources/data/files/external_parquet/_SUCCESS delete mode 100644 sql/hive/src/test/resources/data/files/external_parquet/_common_metadata delete mode 100644 sql/hive/src/test/resources/data/files/external_parquet/_metadata delete mode 100644 sql/hive/src/test/resources/data/files/external_parquet/part-r-00000-12ba2ae2-f8ef-4c73-9cae-399570a999a5.gz.parquet delete mode 100644 sql/hive/src/test/resources/data/files/external_t5/data.txt diff --git a/sql/hive/src/test/resources/data/files/csv_table/data1.csv b/sql/hive/src/test/resources/data/files/csv_table/data1.csv deleted file mode 100644 index 0e88c499a701c..0000000000000 --- a/sql/hive/src/test/resources/data/files/csv_table/data1.csv +++ /dev/null @@ -1 +0,0 @@ -1,2 \ No newline at end of file diff --git a/sql/hive/src/test/resources/data/files/csv_table/data2.csv b/sql/hive/src/test/resources/data/files/csv_table/data2.csv deleted file mode 100644 index 7208c2182900c..0000000000000 --- a/sql/hive/src/test/resources/data/files/csv_table/data2.csv +++ /dev/null @@ -1 +0,0 @@ -2.4 \ No newline at end of file diff --git a/sql/hive/src/test/resources/data/files/external_parquet/_SUCCESS b/sql/hive/src/test/resources/data/files/external_parquet/_SUCCESS deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/sql/hive/src/test/resources/data/files/external_parquet/_common_metadata b/sql/hive/src/test/resources/data/files/external_parquet/_common_metadata deleted file mode 100644 index 60483460a89acd5cadcfdb2970a83593b7b2596b..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 320 zcmb7=K}*9x5QVqVW3Q0}Jd_2&LR=cNi6MIN=CODcDbw9a4Q|qOcY{dC|M2HJN)@l3 zW_a_ydBd#lAJZ_1?;hr^2m88l>IGW}PgxjT1kuSxt$w?dow0ZxpGV8R-3jP{tm2yz z?qCGrhXK8?`!~^|2Mr#gkJ-rmdyl;0&f7tGp76RsqaClFM&3dDHKG*KkG`h!xI;@q zHy8t3Lk>bQ;Xhk>HBM9Vhfe=ZKTj;WJzjj(+QsE|XpCND*Gd+J%u1OflyI}kRhDas Ws;pI!OGULV(?TzqoH4eh-+cn#9APB@ diff --git a/sql/hive/src/test/resources/data/files/external_parquet/_metadata b/sql/hive/src/test/resources/data/files/external_parquet/_metadata deleted file mode 100644 index ffdf6760bef965dafad5290abd137936fda38b5d..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 542 zcmbu7O-sW-5QaCgNDrQbED7{b78DEFknbk77jGVm2QMOJy4loX(vQssv89*(3V(z@ z(rs*u9>qP(4zu&lGra8R?!hy3Yj}TAEkV4FEB3-mFj~mauXOWl%-mDvA*@>+%QlCq zQ&K_Q{h^*e0Ux+e^MvO#Wc`4q3^)zaH0ryMrYS_ubnPh7s^${6JBk;f_PN!JH(FT7 zk9d9E9-1u;A@m)w%FsVaJM9{@IKbII=r@? zY@}k{(9Mcs5UjgMDd|fCkyWf-sP`46wZUAONk7c?+B}oM{R>ikkyMOyL#vLGzQx82 zV5}vVl};iH04KS~tYma+#Uf(?%b4~o6*RjW3)-YF?T%A jsr!yAn9mwDm)G6E5!?^C=Q - val location1 = Utils.getSparkClassLoader.getResource("data/files/csv_table").getFile - sql( - s"""CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT) + // EXTERNAL OpenCSVSerde table pointing to LOCATION + + val file1 = new File(tempDir + "/data1") + val writer1 = new PrintWriter(file1) + writer1.write("1,2") + writer1.close() + + val file2 = new File(tempDir + "/data2") + val writer2 = new PrintWriter(file2) + writer2.write("1,2") + writer2.close() + + sql( + s"""CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES ( \"separatorChar\" = \",\", \"quoteChar\" = \"\\\"\", \"escapeChar\" = \"\\\\\") - LOCATION '$location1' + LOCATION '$tempDir' """) - val answer1 = - sql("SELECT input_file_name() FROM csv_table").head().getString(0) - assert(answer1.contains(location1)) + val answer1 = + sql("SELECT input_file_name() FROM csv_table").head().getString(0) + assert(answer1.contains("data1") || answer1.contains("data2")) - val count1 = sql("SELECT input_file_name() FROM csv_table").distinct().count() - assert(count1 == 2) - sql("DROP TABLE csv_table") + val count1 = sql("SELECT input_file_name() FROM csv_table").distinct().count() + assert(count1 == 2) + sql("DROP TABLE csv_table") - // EXTERNAL pointing to LOCATION + // EXTERNAL pointing to LOCATION - val location2 = Utils.getSparkClassLoader.getResource("data/files/external_t5").getFile - sql( - s"""CREATE EXTERNAL TABLE external_t5 (c1 int, c2 int) + sql( + s"""CREATE EXTERNAL TABLE external_t5 (c1 int, c2 int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' - LOCATION '$location2' + LOCATION '$tempDir' """) - val answer2 = - sql("SELECT input_file_name() as file FROM external_t5").head().getString(0) - assert(answer2.contains("external_t5")) + val answer2 = + sql("SELECT input_file_name() as file FROM external_t5").head().getString(0) + assert(answer1.contains("data1") || answer1.contains("data2")) - val count2 = sql("SELECT input_file_name() as file FROM external_t5").distinct().count - assert(count2 == 1) - sql("DROP TABLE external_t5") + val count2 = sql("SELECT input_file_name() as file FROM external_t5").distinct().count + assert(count2 == 2) + sql("DROP TABLE external_t5") + } - // External parquet pointing to LOCATION + withTempDir { tempDir => - val location3 = Utils.getSparkClassLoader.getResource("data/files/external_parquet").getFile - sql( - s"""CREATE EXTERNAL TABLE external_parquet(c1 int, c2 int) - stored as parquet - LOCATION '$location3' + // External parquet pointing to LOCATION + + val parquetLocation = tempDir + "/external_parquet" + sql("SELECT 1, 2").write.parquet(parquetLocation) + + sql( + s"""CREATE EXTERNAL TABLE external_parquet(c1 int, c2 int) + STORED AS PARQUET + LOCATION '$parquetLocation' """) - val answer3 = - sql("SELECT input_file_name() as file FROM external_parquet").head().getString(0) - assert(answer3.contains("external_parquet")) + val answer3 = + sql("SELECT input_file_name() as file FROM external_parquet").head().getString(0) + assert(answer3.contains("external_parquet")) - val count3 = sql("SELECT input_file_name() as file FROM external_parquet").distinct().count - assert(count3 == 1) - sql("DROP TABLE external_parquet") + val count3 = sql("SELECT input_file_name() as file FROM external_parquet").distinct().count + assert(count3 == 1) + sql("DROP TABLE external_parquet") + } // Non-External parquet pointing to /tmp/... From eeaa6b6eed2812b879911aba03922ec305459b88 Mon Sep 17 00:00:00 2001 From: xin Wu Date: Sun, 15 Nov 2015 20:35:46 -0800 Subject: [PATCH 10/10] SPARK-11522 resolve conflict --- .../org/apache/spark/sql/hive/execution/HiveUDFSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 4fdf28cf6785f..9deb1a6db15ad 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectIns import org.apache.hadoop.hive.serde2.{AbstractSerDe, SerDeStats} import org.apache.hadoop.io.Writable import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.util.Utils