Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3007][SQL]Add Dynamic Partition support to Spark Sql hive #2226

Closed
wants to merge 26 commits into from

Conversation

baishuo
Copy link
Contributor

@baishuo baishuo commented Sep 1, 2014

a new PR base on new master. changes are the same as #1919

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@liancheng
Copy link
Contributor

ok to test

@baishuo
Copy link
Contributor Author

baishuo commented Sep 3, 2014

Hi @marmbrus and @liancheng, the latest code had pass "dev/lint-scala" and "sbt/sbt catalyst/test sql/test hive/test" locally.

fileSinkConf,
jobConfSer,
sc.hiveconf.getBoolean("hive.exec.compress.output", false),
dynamicPartNum)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bad indentation :)

@liancheng
Copy link
Contributor

Please also submit golden answer files for newly whitelisted test cases in HiveCompatibilitySuite.

@liancheng
Copy link
Contributor

ok to test

@liancheng
Copy link
Contributor

@baishuo Just added a note about Hive golden answer files in Spark Wiki https://cwiki.apache.org/confluence/display/SPARK/Spark+SQL+Internals, please refer to this page to generate and submit those files. Thanks!

@liancheng
Copy link
Contributor

ok to test

@baishuo
Copy link
Contributor Author

baishuo commented Sep 5, 2014

The golden file related HiveCompatibilitySuite with had already exists in master branch of spark. So do not need to add them.

@baishuo
Copy link
Contributor Author

baishuo commented Sep 5, 2014

can this PR be tested? :)

@liancheng
Copy link
Contributor

test this please

@SparkQA
Copy link

SparkQA commented Sep 5, 2014

Can one of the admins verify this patch?

@liancheng
Copy link
Contributor

test this please

if (outputPath == null) {
throw new IOException("Undefined job output-path")
}
val workPath = new Path(outputPath, dynamicPartPath.substring(1)) // remove "/"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about .stripPrefix("/")?

@marmbrus
Copy link
Contributor

marmbrus commented Sep 9, 2014

ok to test

writerMap += (record._2 -> tempWriter)
tempWriter
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the indentation here is off.

@SparkQA
Copy link

SparkQA commented Sep 9, 2014

QA tests have started for PR 2226 at commit 15d877b.

  • This patch merges cleanly.

writer.commitJob()
/*
* if rowVal is null or "",will return HiveConf.get(hive.exec.default.partition.name) with default
* */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be:

/**
 * Returns `rowVal` as a String. If `rowVal` is null or equal to "", returns the default partition name.
 */

@marmbrus
Copy link
Contributor

marmbrus commented Sep 9, 2014

Thanks again for working on this! This will be an awesome feature to have. :) I did a pretty detailed pass and made a few comments. A few high-level notes:

  • There was a lot of unnecessary mutable state, which we try to avoid in Spark SQL. In general we try to limit the use of vars to places where it is critical for performance.
  • If at all possible it would be great to separate the dynamic partition support from the rest of the code. Right now there are a lot of if (dynamicPartNum == 0) or if (record._2 == null) or if (dynamicPartNum > 0) checks interleaved with other code. I think this might be a little easier to follow if common code was broken out into functions and there was a single path for each type of insertion.

* Create a `HiveRecordWriter`. A relative dynamic partition path can be used to create a writer
* for writing data to a dynamic partition.
*/
def open() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems open is not a good name at here. Maybe rename it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe init()? Also, I forgot to update the comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it always called after executorSideSetup? If so, can we rename it to something like setupWriter (or initWriter) and call it at the end of executorSideSetup instead of call it in writeToFile?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I also realized this. Renamed this to initWriters and merged it into executorSideSetup. Also merged the commit() call into close().

@liancheng
Copy link
Contributor

Addressed @yhuai's comments except for adding more tests, will add them soon.

@SparkQA
Copy link

SparkQA commented Sep 18, 2014

QA tests have started for PR 2226 at commit b20a3dc.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 18, 2014

QA tests have finished for PR 2226 at commit b20a3dc.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 23, 2014

QA tests have started for PR 2226 at commit e69ce88.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 23, 2014

QA tests have finished for PR 2226 at commit e69ce88.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 23, 2014

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20679/

@liancheng
Copy link
Contributor

LGTM

@marmbrus This is finally good to go :)

@baishuo
Copy link
Contributor Author

baishuo commented Sep 23, 2014

thanks a lot to @liancheng :)

@marmbrus
Copy link
Contributor

Awesome, thanks guys! Can you remove the "s from the title... I think that is breaking my merge script.

@baishuo baishuo changed the title [SPARK-3007][SQL]Add "Dynamic Partition" support to Spark Sql hive [SPARK-3007][SQL]Add Dynamic Partition support to Spark Sql hive Sep 24, 2014
@baishuo
Copy link
Contributor Author

baishuo commented Sep 24, 2014

had remove "s from title @marmbrus

@baishuo
Copy link
Contributor Author

baishuo commented Sep 24, 2014

I think I should say thank you to @liancheng and @yhuai. During the communication with you, I had learned a lot :)

@baishuo
Copy link
Contributor Author

baishuo commented Sep 25, 2014

hi @marmbrus ,would you please run the merge script again? :)

@asfgit asfgit closed this in 0bbe7fa Sep 29, 2014
@kayousterhout
Copy link
Contributor

I've merged this into master. Sorry for the delay -- unicode characters in the commit author names were causing our merge script to crash!

@liancheng
Copy link
Contributor

Haha, have we updated our merge script to handle unicode? I should note that half of Spark SQL contributors are Chinese :)

asfgit pushed a commit that referenced this pull request Oct 3, 2014
PR #2226 was reverted because it broke Jenkins builds for unknown reason. This debugging PR aims to fix the Jenkins build.

This PR also fixes two bugs:

1. Compression configurations in `InsertIntoHiveTable` are disabled by mistake

   The `FileSinkDesc` object passed to the writer container doesn't have compression related configurations. These configurations are not taken care of until `saveAsHiveFile` is called. This PR moves compression code forward, right after instantiation of the `FileSinkDesc` object.

1. `PreInsertionCasts` doesn't take table partitions into account

   In `castChildOutput`, `table.attributes` only contains non-partition columns, thus for partitioned table `childOutputDataTypes` never equals to `tableOutputDataTypes`. This results funny analyzed plan like this:

   ```
   == Analyzed Logical Plan ==
   InsertIntoTable Map(partcol1 -> None, partcol2 -> None), false
    MetastoreRelation default, dynamic_part_table, None
    Project [c_0#1164,c_1#1165,c_2#1166]
     Project [c_0#1164,c_1#1165,c_2#1166]
      Project [c_0#1164,c_1#1165,c_2#1166]
       ... (repeats 99 times) ...
        Project [c_0#1164,c_1#1165,c_2#1166]
         Project [c_0#1164,c_1#1165,c_2#1166]
          Project [1 AS c_0#1164,1 AS c_1#1165,1 AS c_2#1166]
           Filter (key#1170 = 150)
            MetastoreRelation default, src, None
   ```

   Awful though this logical plan looks, it's harmless because all projects will be eliminated by optimizer. Guess that's why this issue hasn't been caught before.

Author: Cheng Lian <lian.cs.zju@gmail.com>
Author: baishuo(白硕) <vc_java@hotmail.com>
Author: baishuo <vc_java@hotmail.com>

Closes #2616 from liancheng/dp-fix and squashes the following commits:

21935b6 [Cheng Lian] Adds back deleted trailing space
f471c4b [Cheng Lian] PreInsertionCasts should take table partitions into account
a132c80 [Cheng Lian] Fixes output compression
9c6eb2d [Cheng Lian] Adds tests to verify dynamic partitioning folder layout
0eed349 [Cheng Lian] Addresses @yhuai's comments
26632c3 [Cheng Lian] Adds more tests
9227181 [Cheng Lian] Minor refactoring
c47470e [Cheng Lian] Refactors InsertIntoHiveTable to a Command
6fb16d7 [Cheng Lian] Fixes typo in test name, regenerated golden answer files
d53daa5 [Cheng Lian] Refactors dynamic partitioning support
b821611 [baishuo] pass check style
997c990 [baishuo] use HiveConf.DEFAULTPARTITIONNAME to replace hive.exec.default.partition.name
761ecf2 [baishuo] modify according micheal's advice
207c6ac [baishuo] modify for some bad indentation
caea6fb [baishuo] modify code to pass scala style checks
b660e74 [baishuo] delete a empty else branch
cd822f0 [baishuo] do a little modify
8e7268c [baishuo] update file after test
3f91665 [baishuo(白硕)] Update Cast.scala
8ad173c [baishuo(白硕)] Update InsertIntoHiveTable.scala
051ba91 [baishuo(白硕)] Update Cast.scala
d452eb3 [baishuo(白硕)] Update HiveQuerySuite.scala
37c603b [baishuo(白硕)] Update InsertIntoHiveTable.scala
98cfb1f [baishuo(白硕)] Update HiveCompatibilitySuite.scala
6af73f4 [baishuo(白硕)] Update InsertIntoHiveTable.scala
adf02f1 [baishuo(白硕)] Update InsertIntoHiveTable.scala
1867e23 [baishuo(白硕)] Update SparkHadoopWriter.scala
6bb5880 [baishuo(白硕)] Update HiveQl.scala
asfgit pushed a commit that referenced this pull request Oct 5, 2014
… versions

This is a follow up of #2226 and #2616 to fix Jenkins master SBT build failures for lower Hadoop versions (1.0.x and 2.0.x).

The root cause is the semantics difference of `FileSystem.globStatus()` between different versions of Hadoop, as illustrated by the following test code:

```scala
object GlobExperiments extends App {
  val conf = new Configuration()
  val fs = FileSystem.getLocal(conf)
  fs.globStatus(new Path("/tmp/wh/*/*/*")).foreach { status =>
    println(status.getPath)
  }
}
```

Target directory structure:

```
/tmp/wh
├── dir0
│   ├── dir1
│   │   └── level2
│   └── level1
└── level0
```

Hadoop 2.4.1 result:

```
file:/tmp/wh/dir0/dir1/level2
```

Hadoop 1.0.4 resuet:

```
file:/tmp/wh/dir0/dir1/level2
file:/tmp/wh/dir0/level1
file:/tmp/wh/level0
```

In #2226 and #2616, we call `FileOutputCommitter.commitJob()` at the end of the job, and the `_SUCCESS` mark file is written. When working with lower Hadoop versions, due to the `globStatus()` semantics issue, `_SUCCESS` is included as a separate partition data file by `Hive.loadDynamicPartitions()`, and fails partition spec checking.  The fix introduced in this PR is kind of a hack: when inserting data with dynamic partitioning, we intentionally avoid writing the `_SUCCESS` marker to workaround this issue.

Hive doesn't suffer this issue because `FileSinkOperator` doesn't call `FileOutputCommitter.commitJob()`, instead, it calls `Utilities.mvFileToFinalPath()` to cleanup the output directory and then loads it into Hive warehouse by with `loadDynamicPartitions()`/`loadPartition()`/`loadTable()`. This approach is better because it handles failed job and speculative tasks properly. We should add this step to `InsertIntoHiveTable` in another PR.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #2663 from liancheng/dp-hadoop-1-fix and squashes the following commits:

0177dae [Cheng Lian] Fixes dynamic partitioning support for lower Hadoop versions
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants