Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4131][SQL] support writing data into the filesystem from queries #4380

Closed
wants to merge 17 commits into from

Conversation

scwf
Copy link
Contributor

@scwf scwf commented Feb 5, 2015

Support writing data into the filesystem from queries
syntax:
INSERT OVERWRITE [LOCAL] DIRECTORY directory [ROW FORMAT row_format] STORED AS file_format SELECT ... FROM ...

@scwf scwf changed the title [SPARK-4131] [SQL] [WIP] support writing data into the filesystem from queries [SPARK-4131][SQL][WIP] support writing data into the filesystem from queries Feb 5, 2015
@SparkQA
Copy link

SparkQA commented Feb 5, 2015

Test build #26814 has finished for PR 4380 at commit cd1cd10.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class WriteToDirectory[T](
    • case class WriteToDirectory(
    • assert(valueClass != null, "Output value class not set")
    • assert(outputFileFormatClassName != null, "Output format class not set")

execute()
logWarning("use execute collect")
Array.empty[Row]
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems this is not necessary

@SparkQA
Copy link

SparkQA commented Feb 6, 2015

Test build #26918 has finished for PR 4380 at commit 8d4dfae.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class WriteToDirectory[T](
    • case class WriteToDirectory(
    • assert(valueClass != null, "Output value class not set")
    • assert(outputFileFormatClassName != null, "Output format class not set")

@SparkQA
Copy link

SparkQA commented Feb 6, 2015

Test build #26919 has finished for PR 4380 at commit 3d8a460.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class WriteToDirectory[T](
    • case class WriteToDirectory(
    • assert(valueClass != null, "Output value class not set")
    • assert(outputFileFormatClassName != null, "Output format class not set")

@scwf
Copy link
Contributor Author

scwf commented Feb 7, 2015

/cc @liancheng can you help review this?

@SparkQA
Copy link

SparkQA commented Feb 7, 2015

Test build #26982 has finished for PR 4380 at commit d45e915.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class WriteToDirectory[T](
    • case class WriteToDirectory(
    • assert(valueClass != null, "Output value class not set")
    • assert(outputFileFormatClassName != null, "Output format class not set")

@SparkQA
Copy link

SparkQA commented Feb 7, 2015

Test build #26984 has finished for PR 4380 at commit 0561486.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class DescribeCommand(
    • case class WriteToDirectory[T](
    • case class WriteToDirectory(
    • assert(valueClass != null, "Output value class not set")
    • assert(outputFileFormatClassName != null, "Output format class not set")

@SparkQA
Copy link

SparkQA commented Feb 7, 2015

Test build #26983 has finished for PR 4380 at commit 743a89d.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
    • case class WriteToDirectory[T](
    • case class WriteToDirectory(
    • assert(valueClass != null, "Output value class not set")
    • assert(outputFileFormatClassName != null, "Output format class not set")

@scwf
Copy link
Contributor Author

scwf commented Feb 7, 2015

retest this please

AttributeReference("col_name", StringType, nullable = false)(),
AttributeReference("data_type", StringType, nullable = false)(),
AttributeReference("comment", StringType, nullable = false)())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi, @scwf
FYI. The DescribeCommand has been moved into sources.ddl.scala, can remove this from here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, my bad, should remove it from here

@SparkQA
Copy link

SparkQA commented Feb 7, 2015

Test build #26990 has finished for PR 4380 at commit 0561486.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class DescribeCommand(
    • case class WriteToDirectory[T](
    • case class WriteToDirectory(
    • assert(valueClass != null, "Output value class not set")
    • assert(outputFileFormatClassName != null, "Output format class not set")

@SparkQA
Copy link

SparkQA commented Feb 7, 2015

Test build #26993 has finished for PR 4380 at commit 8a17484.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class WriteToDirectory[T](
    • case class WriteToDirectory(
    • assert(valueClass != null, "Output value class not set")
    • assert(outputFileFormatClassName != null, "Output format class not set")

@liancheng
Copy link
Contributor

Hey @scwf, @yhuai is working on generalized write support for the data source API, which I believe covers your concerns here. Please refer to #4294 and #4446 for details.

@liancheng
Copy link
Contributor

Actually @yhuai's work doesn't cover your concerns entirely, but you can build yours upon his. Basically it's a sql("<some query>").save("path", "some.data.source"). And to support arbitrary Hive SerDes, we can have a Hive SerDe data source, which can be merged into the Hive data source (much) later once we are able to extract functionalities of HiveContext into a separate data source.

@scwf
Copy link
Contributor Author

scwf commented Feb 15, 2015

Yeah getit, but that maybe much later, is it possible to let this in for transition since
1 this syntax is a basic functional point in hive ql and it is useful from our customer
2 this Pr just add the implementation in hive subproject

@marmbrus
Copy link
Contributor

ping. any update here?

@scwf scwf changed the title [SPARK-4131][SQL][WIP] support writing data into the filesystem from queries [SPARK-4131][SQL] support writing data into the filesystem from queries Mar 19, 2015
@scwf
Copy link
Contributor Author

scwf commented Mar 19, 2015

@marmbrus, according to liancheng's suggestion this PR rely on the Hive SerDe data source(not implemented), so now this version added a 'WriteToDirs ' to implement this feature. could you have a look and give some feed back ?

serializer
}

// maybe we can make it as a common method, share with `InsertIntoHiveTable`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

I think my biggest feedback is that this seems like a lot of new code, given we already have the ability to write data using arbitrary SerDes to a file system. Can we acomplish the same thing with only small changes to the parser and maybe a small logical / physical node? Or am i missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes in InsertIntoHiveTable we can write data using arbitrary SerDes to a file system. But we can not reuse InsertIntoHiveTable to share the same physical plan since
1 InsertIntoHiveTable and WriteToDirectory have different input
2 InsertIntoHiveTable has complex logical to handle dynamic partitioning

So i think we can extract a common interface(SaveAsHiveFile) to reuse the code of writing data to file system.

@SparkQA
Copy link

SparkQA commented Apr 21, 2015

Test build #30628 has started for PR 4380 at commit 932c37c.

@SparkQA
Copy link

SparkQA commented Apr 21, 2015

Test build #30633 has started for PR 4380 at commit 60e3f84.

@scwf
Copy link
Contributor Author

scwf commented Apr 21, 2015

/cc @marmbrus

@SparkQA
Copy link

SparkQA commented Apr 30, 2015

Test build #31424 has started for PR 4380 at commit bc8f71b.

@scwf
Copy link
Contributor Author

scwf commented May 3, 2015

Updated, to summarize this:
1 Get a unresolved plan WriteToDirectory(path, child, isLocal, extra: ASTNode) in hiveql from hive ast

2 Analyze WriteToDirectory(path: String, child: LogicalPlan, isLocal: Boolean, ASTNode) to WriteToDirectory(path: String, child: LogicalPlan, isLocal: Boolean, desc: TableDesc) in hive context analyzer

3 Transform WriteToDirectory(path: String, child: LogicalPlan, isLocal: Boolean, desc: TableDesc) to execution.WriteToDirectory when query planning

4 Extract a common Interface SaveAsHiveFile to share code of writting data to FS
/cc @marmbrus

@scwf
Copy link
Contributor Author

scwf commented May 6, 2015

ping

@SparkQA
Copy link

SparkQA commented May 19, 2015

Test build #33078 has started for PR 4380 at commit 445df13.

// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p

case WriteToDirectory(path, child, isLocal, extra: ASTNode) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be holding onto the AST anymore, as this ties us a little too closely to a specific version of Hive. Instead look at how we are doing things in CreateTableAsSelect.

Also, please add a test case where you use this new operation on a Spark SQL temporary table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah thanks i am updating this

@SparkQA
Copy link

SparkQA commented May 21, 2015

Test build #33232 has started for PR 4380 at commit 401fab8.

@SparkQA
Copy link

SparkQA commented Jul 29, 2015

Test build #38789 has finished for PR 4380 at commit 33a2b0a.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • assert(valueClass != null, "Output value class not set")
    • assert(outputFileFormatClassName != null, "Output format class not set")
    • case class WriteToDirectory(

@scwf
Copy link
Contributor Author

scwf commented Jul 29, 2015

retest this please

@SparkQA
Copy link

SparkQA commented Jul 29, 2015

Test build #38811 has finished for PR 4380 at commit 33a2b0a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 29, 2015

Test build #144 has finished for PR 4380 at commit 33a2b0a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • assert(valueClass != null, "Output value class not set")
    • assert(outputFileFormatClassName != null, "Output format class not set")
    • case class WriteToDirectory(

@SparkQA
Copy link

SparkQA commented Aug 4, 2015

Test build #39628 has finished for PR 4380 at commit 438a209.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • assert(valueClass != null, "Output value class not set")
    • assert(outputFileFormatClassName != null, "Output format class not set")
    • case class WriteToDirectory(

@SparkQA
Copy link

SparkQA commented Aug 8, 2015

Test build #40233 has finished for PR 4380 at commit 085e427.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • assert(valueClass != null, "Output value class not set")
    • assert(outputFileFormatClassName != null, "Output format class not set")
    • case class WriteToDirectory(

@rxin
Copy link
Contributor

rxin commented Aug 11, 2015

@scwf looks like there is a real failure?

@scwf
Copy link
Contributor Author

scwf commented Aug 12, 2015

@rxin yes, since we upgrade the hive version to 1.2.1, we should adapt the token tree in hiveql, the old one is not correct in 1.2.1. Updated

@scwf
Copy link
Contributor Author

scwf commented Aug 12, 2015

retest this please

@SparkQA
Copy link

SparkQA commented Aug 12, 2015

Test build #40600 has finished for PR 4380 at commit 9cc8474.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • assert(valueClass != null, "Output value class not set")
    • assert(outputFileFormatClassName != null, "Output format class not set")
    • case class WriteToDirectory(

@litao-buptsse
Copy link
Contributor

@scwf @liancheng Is there any plan to merge this PR to branch-1.5 recently? I think this feature is pretty useful.

@yhuai
Copy link
Contributor

yhuai commented Sep 17, 2015

@litao-buptsse Since it is a new feature, I think we will not get it in branch 1.5. But, we can target 1.6.

@litao-buptsse
Copy link
Contributor

@yhuai Got it, thank you very much.

@litao-buptsse
Copy link
Contributor

@scwf @yhuai I apply this patch to my branch-1.5 code. It works!

But I found a bug. When I use lower case "local", it will try to insert to hdfs file system.

insert overwrite local directory "/mypath" select ...

When I use upper case "LOCAL", it insert to local file system correctly.

insert overwrite LOCAL directory "/mypath" select ...

@litao-buptsse
Copy link
Contributor

case Token(destinationToken(),
           Token("TOK_DIR", path :: formats) :: Nil) =>
      var isLocal = false
      formats.collect {
        case Token("LOCAL", others) =>
          isLocal = true
      }
      WriteToDirectory(
        BaseSemanticAnalyzer.unescapeSQLString(path.getText),
        query,
        isLocal,
        parseTableDesc(formats))

@scwf @yhuai For the code above, should change "LOCAL" to

val LOCAL = "(?i)LOCAL".r

case Token(LOCAL(), others) =>
          isLocal = true

@litao-buptsse
Copy link
Contributor

@scwf @yhuai It works in "local" mode, but not well in "yarn-client" mode.

15/09/19 18:03:48 ERROR thriftserver.SparkSQLDriver: Failed in [insert overwrite directory 'file://[337/9806]
p/0919' select query from custom.common_pc_pv where logdate='2015091905' limit 10]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most r
ecent failure: Lost task 0.3 in stage 5.0 (TID 71, cloud101411240.wd.nm.ss.nop.sogou-op.org): org.apache.hado
op.hive.ql.metadata.HiveException: java.io.IOException: Mkdirs failed to create file:/search/tmp/0919/_tempor
ary/0/_temporary/attempt_201509191803_0005_m_000000_3 (exists=false, cwd=file:/search/hadoop02/yarn_local/use
rcache/spark/appcache/application_1442391298043_56782/container_1442391298043_56782_01_000008)
        at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:249)
        at org.apache.spark.sql.hive.SparkHiveWriterContainer.initWriters(hiveWriterContainers.scala:115)
        at org.apache.spark.sql.hive.SparkHiveWriterContainer.executorSideSetup(hiveWriterContainers.scala:87
)
        at org.apache.spark.sql.hive.SaveAsHiveFile$class.writeToFile$1(SaveAsHiveFile.scala:84)
        at org.apache.spark.sql.hive.SaveAsHiveFile$$anonfun$saveAsHiveFile$3.apply(SaveAsHiveFile.scala:68)
        at org.apache.spark.sql.hive.SaveAsHiveFile$$anonfun$saveAsHiveFile$3.apply(SaveAsHiveFile.scala:68)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:88)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Mkdirs failed to create file:/search/tmp/0919/_temporary/0/_temporary/attempt
_201509191803_0005_m_000000_3 (exists=false, cwd=file:/search/hadoop02/yarn_local/usercache/spark/appcache/ap
plication_1442391298043_56782/container_1442391298043_56782_01_000008)
        at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:442)
        at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:428)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:801)
        at org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.getHiveRecordWriter(HiveIgnoreKeyTextOu
tputFormat.java:80)
        at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:261)
        at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:246)
        ... 11 more

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndepen
dentStages(DAGScheduler.scala:1283)

@scwf
Copy link
Contributor Author

scwf commented Sep 21, 2015

@litao-buptsse, i will update this soon thanks.

@andrewor14
Copy link
Contributor

@yhuai @liancheng is this still relevant?

@rxin
Copy link
Contributor

rxin commented Dec 31, 2015

I'm going to close this pull request. If this is still relevant and you are interested in pushing it forward, please open a new pull request. Thanks!

@asfgit asfgit closed this in 7b4452b Dec 31, 2015
@litao-buptsse
Copy link
Contributor

I think it's a useful feature and widely used in hive. Why not finish this feature and merge it to branch-1.6?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
10 participants