Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3007][SQL]Add "Dynamic Partition" support to Spark Sql hive #1919

Closed
wants to merge 12 commits into from

Conversation

baishuo
Copy link
Contributor

@baishuo baishuo commented Aug 13, 2014

the detail please refer the comment of https://issues.apache.org/jira/browse/SPARK-3007

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@baishuo
Copy link
Contributor Author

baishuo commented Aug 13, 2014

I didnt add the related test since I dont know how to write it. can any one give me some instruction?:)
but I had test the function by SparkSQLCLIDriver and "sbt/sbt catalyst/test sql/test hive/test" passed

@marmbrus
Copy link
Contributor

There are a couple of ways we can add tests, ideally we would do a little of both:

@@ -93,6 +93,33 @@ private[hive] class SparkHiveHadoopWriter(
null)
}

def open(dynamicPartPath: String) {
val numfmt = NumberFormat.getInstance()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NumberFormat.getInstance() is not thread-safe. We can use a thread-local variable to hold this object, similar to Cast.threadLocalDateFormat

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized this function is a variant of the original open() method within the same file. This should be a bug in the master branch.

Another issue is that, SparkHadoopWriter resides in project core, which is an indirect dependency of sql/hive. Thus logically, it's not proper to put open(dynamicPartPath: String) here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it is actually SparkHiveHadoopWriter in sql/hive. Seems we need to rename this file.

@liancheng
Copy link
Contributor

@yhuai It would be nice if you can have a look at this PR since you're the expert here :)

@baishuo You can refer to sql/README.md for details about setting up testing environment.

@@ -271,4 +272,9 @@ object Cast {
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
}
}
private[sql] val threadLocalNumberFormat = new ThreadLocal[NumberFormat] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry, I didn't make myself clear enough. I mean you can refer to Cast.threadLocalDateFormat, not add the thread-local version of NumberFormat here, since it's not related to Cast. A better place to hold this could be object SparkHadoopWriter.

@liancheng
Copy link
Contributor

Please don't forget to add golden answer files for those test cases newly added to whitelist in HiveCompatibilitySuite.

count += 1
writer2.write(record)
}
for((k,v) <- writerMap) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space before (

@yhuai
Copy link
Contributor

yhuai commented Aug 18, 2014

@baishuo Thank you for working on it.

I have three general comments.

  1. Hive has a lots of confs that are used to influence how semantic analyzer works. HiveConf.ConfVars.DYNAMICPARTITIONING (hive.exec.dynamic.partition) and HiveConf.ConfVars.DYNAMICPARTITIONINGMODE (hive.exec.dynamic.partition.mode) are two examples. As long as we generate the correct results and we can make sure the execution is robust, I think it is not necessarily to follow those confs.
  2. For hive.exec.dynamic.partition.mode, I think the purpose of it is to avoid having too many concurrent file writers in a task. Actually, even if hive.exec.dynamic.partition.mode=strict, we can still have many distinct values on those dynamic partitioning columns and thus, have too many file writers in a task. For those columnar file formats, like RCFile, ORC, and Parquet, every file writer internally maintain a memory buffer. Many file writers can significantly increase the memory footprint of a task and can introduce OOMs. Instead of relying on Hive's confs, it is better to provide a way to group data based on those dynamic partitioning columns. So, we will not have many concurrent file writers. Just two primitive ideas. We can shuffle the data before inserting. Or, we can do local grouping and write data in a group-by-group fashion. Anyway, I feel we may need to introduce changes to the planner.
  3. The last comment is not quite related to this PR. I think it is better to have a general design on how table is partitioned and (hopefully,) Hive's directory layout in HDFS will be just a special case. I am not sure that creating a single file for every combination of values of partitioning columns is a good way. It introduces potential stability issues to the insert operation (too many file writers), and performance issues to both insert and table scan operations. With this approach, we can easily create a lots of small files in HDFS, which introduces memory pressure to the HDFS namenode.

@baishuo
Copy link
Contributor Author

baishuo commented Aug 19, 2014

thanks a lot @yhuai and @liancheng :)

@baishuo
Copy link
Contributor Author

baishuo commented Aug 19, 2014

Hi @marmbrus and @liancheng I had made some modification and do the test with "sbt/sbt catalyst/test sql/test hive/test" . Please help me to check if it is proper when you have time . Thank you :)

@liancheng
Copy link
Contributor

Hmm, I see 17 newly whitelisted test cases, but only golden answers for the dynamic_partition case were submitted.

@baishuo
Copy link
Contributor Author

baishuo commented Aug 20, 2014

I also curious about that.
I down the master branch,and check the folder sql/hive/src/test/resources/golden
I find that files begin with dynamic_partition_skip_default* or load_dyn_part* already exist.

@baishuo
Copy link
Contributor Author

baishuo commented Aug 20, 2014

Here I try to explain my design idea(the code is mostly in InsertIntoHiveTable.scala) :
lets assume there is a table called table1,which has 2 columns:col1,col2, and two partitions: part1, part2.

first:
In case of just insert data to a static partition,I find when "saveAsHiveFile" finished, the data was wroten to a temporary location, then directory like: /tmp/hive-root/hive_****/-ext-10000,lets call it TMPLOCATION. And under TMPLOCATION, there is sub directory /part1=.../part2=... , all data was store under TMPLOCATION/part1=.../part2=... , then spark will call hive api "loadPartition" to move the files to {hivewarehouse}/{tablename}/part1=.../part2=... and update the metadata. then the whole progress is OK.

If we what to implement the "dynamic partiton function", we need to use hive api "loadDynamicPartitions" to move data and update metadata. But the requirement of directory formate for "loadDynamicPartitions" is a little difference to "loadPartition":

1: In case of one static partition and one dynamic partition (HQL like "
insert overwrite table table1 partition(part1=val1,part2) select a,b,c from ..."), loadDynamicPartitions need the tmp data located at TMPLOCATION/part2=c1(there is NO "part1=val1", in the progress of loadDynamicPartitions, it wiil be added), TMPLOCATION/part2=c2 ......., And loadDynamicPartitions will move them to {hivewarehouse}/{tablename}/part1=val1/part2=c1, {hivewarehouse}/{tablename}/part1=val1/part2=c2 ...., and update the metadata. Note that in this case loadDynamicPartitions do note need the subdir like part1=val1 under TMPLOCATION

2: In case of zero static partition and 2 dynamic partition (HQL like "
insert overwrite table table1 partition(part1,part2) select a,b,x,c from ..."), loadDynamicPartitions need the tmp data located at TMPLOCATION/part1=../part2=c1, TMPLOCATION/part1=../part2=c2 ......., And loadDynamicPartitions will move them to {hivewarehouse}/{tablename}/part1=../part2=...,

So whether there is static partition in HQL determines how we create subdir under TMPLOCATION. That why the function "getDynamicPartDir" exist.

second:
where shall we call the "getDynamicPartDir"? must a location that we can get the values for dynamic partiton. so we call this function at "iter.map { row =>..." in the closure of "val rdd = childRdd.mapPartitions". when we get the row, we can get the values for dynamic partiton. after we get the dynamicPartPath by function getDynamicPartDir, we can pass it to next RDD by the output this RDD: serializer.serialize(outputData, standardOI) -> dynamicPartPath. (for the static partiton,dynamicPartPath is null)

when the next rdd (closure in writeToFile) get the data and dynamicPartPath, we can check if the dynamicPartPath equals null. if not null. we check if there is already a corresponding writer exist in writerMap which store all writer for each partition. if there is. we use this writer to write the record. that ensure the data belongs to same partition will be wroten to the same directory.

loadDynamicPartitions require there is no other files under TMPLOCATION except the subdir for dynamic partition. that why there are several "if (dynamicPartNum == 0)" in writeToFile

@baishuo
Copy link
Contributor Author

baishuo commented Aug 26, 2014

Hi @marmbrus i had update the file relating with test. all test passed on my machine. Would you please help to verify this patch when you have time:) I had write out the thinking of the code. thank you.
@rxin @liancheng

@marmbrus
Copy link
Contributor

Thanks for working on this! We will have more time to review it after the Spark 1.1 release.

@marmbrus
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Aug 30, 2014

QA tests have started for PR 1919 at commit 0c324be.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 30, 2014

QA tests have finished for PR 1919 at commit 0c324be.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Sqrt(child: Expression) extends UnaryExpression
    • class TreeNodeRef(val obj: TreeNode[_])

@baishuo
Copy link
Contributor Author

baishuo commented Sep 1, 2014

Hi @marmbrus , can you help me to check why the test fail? I had compile and do the test locally so I had thought it can passed the Spark QA test:) . And there is a new PR(with same changes, had test locally )#2226 base on new master. Would you please do a test on it if this PR still fails? thank you :)

@liancheng
Copy link
Contributor

@baishuo Scala style check failed. See here for details.

@liancheng
Copy link
Contributor

Would you mind to close this PR since #2226 was opened as a replacement?

@baishuo
Copy link
Contributor Author

baishuo commented Sep 3, 2014

no problem, close this PR

@baishuo baishuo closed this Sep 3, 2014
asfgit pushed a commit that referenced this pull request Sep 29, 2014
a new PR base on new master.  changes are the same as #1919

Author: baishuo(白硕) <vc_java@hotmail.com>
Author: baishuo <vc_java@hotmail.com>
Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #2226 from baishuo/patch-3007 and squashes the following commits:

e69ce88 [Cheng Lian] Adds tests to verify dynamic partitioning folder layout
b20a3dc [Cheng Lian] Addresses @yhuai's comments
096bbbc [baishuo(白硕)] Merge pull request #1 from liancheng/refactor-dp
1093c20 [Cheng Lian] Adds more tests
5004542 [Cheng Lian] Minor refactoring
fae9eff [Cheng Lian] Refactors InsertIntoHiveTable to a Command
528e84c [Cheng Lian] Fixes typo in test name, regenerated golden answer files
c464b26 [Cheng Lian] Refactors dynamic partitioning support
5033928 [baishuo] pass check style
2201c75 [baishuo] use HiveConf.DEFAULTPARTITIONNAME to replace hive.exec.default.partition.name
b47c9bf [baishuo] modify according micheal's advice
c3ab36d [baishuo] modify for some bad indentation
7ce2d9f [baishuo] modify code to pass scala style checks
37c1c43 [baishuo] delete a empty else branch
66e33fc [baishuo] do a little modify
88d0110 [baishuo] update file after test
a3961d9 [baishuo(白硕)] Update Cast.scala
f7467d0 [baishuo(白硕)] Update InsertIntoHiveTable.scala
c1a59dd [baishuo(白硕)] Update Cast.scala
0e18496 [baishuo(白硕)] Update HiveQuerySuite.scala
60f70aa [baishuo(白硕)] Update InsertIntoHiveTable.scala
0a50db9 [baishuo(白硕)] Update HiveCompatibilitySuite.scala
491c7d0 [baishuo(白硕)] Update InsertIntoHiveTable.scala
a2374a8 [baishuo(白硕)] Update InsertIntoHiveTable.scala
701a814 [baishuo(白硕)] Update SparkHadoopWriter.scala
dc24c41 [baishuo(白硕)] Update HiveQl.scala
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Mar 26, 2024
Co-authored-by: Szehon Ho <szehon.apache@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants