Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3007][SQL] Adds dynamic partitioning support #2616

Closed
wants to merge 28 commits into from

Conversation

liancheng
Copy link
Contributor

PR #2226 was reverted because it broke Jenkins builds for unknown reason. This debugging PR aims to fix the Jenkins build.

This PR also fixes two bugs:

  1. Compression configurations in InsertIntoHiveTable are disabled by mistake

    The FileSinkDesc object passed to the writer container doesn't have compression related configurations. These configurations are not taken care of until saveAsHiveFile is called. This PR moves compression code forward, right after instantiation of the FileSinkDesc object.

  2. PreInsertionCasts doesn't take table partitions into account

    In castChildOutput, table.attributes only contains non-partition columns, thus for partitioned table childOutputDataTypes never equals to tableOutputDataTypes. This results funny analyzed plan like this:

    == Analyzed Logical Plan ==
    InsertIntoTable Map(partcol1 -> None, partcol2 -> None), false
    MetastoreRelation default, dynamic_part_table, None
     Project [c_0#1164,c_1#1165,c_2#1166]
      Project [c_0#1164,c_1#1165,c_2#1166]
       Project [c_0#1164,c_1#1165,c_2#1166]
        ... (repeats 99 times) ...
         Project [c_0#1164,c_1#1165,c_2#1166]
          Project [c_0#1164,c_1#1165,c_2#1166]
           Project [1 AS c_0#1164,1 AS c_1#1165,1 AS c_2#1166]
            Filter (key#1170 = 150)
             MetastoreRelation default, src, None
    

    Awful though this logical plan looks, it's harmless because all projects will be eliminated by optimizer. Guess that's why this issue hasn't been caught before.

@SparkQA
Copy link

SparkQA commented Oct 1, 2014

QA tests have started for PR 2616 at commit a132c80.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 1, 2014

QA tests have started for PR 2616 at commit f471c4b.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 1, 2014

QA tests have finished for PR 2616 at commit a132c80.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor Author

Reverted the accidental trailing space change. However, since this is really dangerous, fixed it in #2619.

@SparkQA
Copy link

SparkQA commented Oct 1, 2014

QA tests have finished for PR 2616 at commit 21935b6.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21131/

asfgit pushed a commit that referenced this pull request Oct 1, 2014
MD5 of query strings in `createQueryTest` calls are used to generate golden files, leaving trailing spaces there can be really dangerous. Got bitten by this while working on #2616: my "smart" IDE automatically removed a trailing space and makes Jenkins fail.

(Really should add "no trailing space" to our coding style guidelines!)

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #2619 from liancheng/kill-trailing-space and squashes the following commits:

034f119 [Cheng Lian] Kill dangerous trailing space in query string
@liancheng liancheng changed the title [SPARK-3007][SQL] WIP: adds dynamic partitioning support [SPARK-3007][SQL] Adds dynamic partitioning support Oct 3, 2014
@liancheng
Copy link
Contributor Author

@marmbrus Let's try to merge this one to master and see whether Jenkins accepts it.

@marmbrus
Copy link
Contributor

marmbrus commented Oct 3, 2014

Tried merging but it failed :(

@kayousterhout what did you end up doing to merge this the first time?

@kayousterhout
Copy link
Contributor

Comment out the print statement in merge_pr that causes the failure.

On Thu, Oct 2, 2014 at 7:58 PM, Michael Armbrust notifications@github.com
wrote:

Tried merging but it failed :(

@kayousterhout https://github.com/kayousterhout what did you end up
doing to merge this the first time?


Reply to this email directly or view it on GitHub
#2616 (comment).

@marmbrus
Copy link
Contributor

marmbrus commented Oct 3, 2014

Hmmm, still failing with:

subprocess.CalledProcessError: Command '[u'git', u'fetch', u'apache', u'master:PR_TOOL_MERGE_PR_2616_MASTER']' returned non-zero exit status 128

@asfgit asfgit closed this in bec0d0e Oct 3, 2014
@scwf
Copy link
Contributor

scwf commented Oct 4, 2014

Hi, @liancheng, master branch test failed in my machine for all dynamic partition ,
[info] - dynamic_partition *** FAILED ***
[info] - Dynamic partition folder layout *** FAILED ***
[info] - dynamic_partition_skip_default *** FAILED ***
[info] - load_dyn_part1 *** FAILED ***
[info] - load_dyn_part10 *** FAILED ***
[info] - load_dyn_part11 *** FAILED ***
[info] - load_dyn_part12 *** FAILED ***
[info] - load_dyn_part13 *** FAILED ***
[info] - load_dyn_part14 *** FAILED ***
[info] - load_dyn_part14_win *** FAILED ***
[info] - load_dyn_part2 *** FAILED ***
[info] - load_dyn_part3 *** FAILED ***
[info] - load_dyn_part4 *** FAILED ***
[info] - load_dyn_part5 *** FAILED ***
[info] - load_dyn_part6 *** FAILED ***
[info] - load_dyn_part8 *** FAILED ***
[info] - load_dyn_part9 *** FAILED ***
[info] *** 17 TESTS FAILED ***

Detail log---
[info] - dynamic_partition *** FAILED ***
[info] Failed to execute query using catalyst:
[info] Error: get partition: Value for key partcol1 is null or empty
[info] org.apache.hadoop.hive.ql.metadata.HiveException: get partition: Value for key partcol1 is null or empty
[info] at org.apache.hadoop.hive.ql.metadata.Hive.getPartition(Hive.java:1585)
[info] at org.apache.hadoop.hive.ql.metadata.Hive.getPartition(Hive.java:1556)
[info] at org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1189)

Here i miss something? My test command as follows:
sbt/sbt -Phive assembly
sbt/sbt -Phive test

@liancheng
Copy link
Contributor Author

@scwf Can you elaborate on what configurations you're using? Details like compilation flags, environment variables and building process can be helpful. I've been tracking this failure during the last a few days but couldn't reproduce it either locally or on Jenkins PR builder.

@liancheng
Copy link
Contributor Author

@scwf Or could you please describe the steps to reproduce this failure from a newly checked out master branch? I guess once you can reproduce it, it happens deterministically.

@liancheng
Copy link
Contributor Author

Ah, just found out that I can reproduce it with -Phive, had been using -Phive,hadoop-2.4 all the time and just couldn't reproduce this, thanks!

@scwf
Copy link
Contributor

scwf commented Oct 4, 2014

Yes, i will use -Phive,hadoop-2.4 to see whether it has the peoblem

@scwf
Copy link
Contributor

scwf commented Oct 4, 2014

using -Phive,hadoop-2.4 is also ok in my local maching

@liancheng
Copy link
Contributor Author

So this bug can be triggered by lower versions of Hadoop, e.g. 1.0.3. I haven't validate the exact range yet.

In Hive.loadDynamicPartitions, Hive calls o.a.h.h.q.e.Utilities.getFileStatusRecurse to glob the temporary directory for data files, it seems that lower versions of Hadoop doesn't filter out files like _SUCCESS, which causes the problem.

Within Hive, loadDynamicPartitions is only used in operations like LOAD. At the end of a normal insertion to a dynamically partitioned table, FileSinkOperator calls Utilities.mvFileToFinalPath to move the entire temporary directory to target location, thus doesn't have this problem.

Utilities.mvFileToFinalPath is more efficient than Hive.loadDynamicPartitions since it doesn't parses and validates partition specs. But it requires some internal Hive data structures like DynamicPartitionCtx. I'll try to see whether I can mock these data structures and use mvFileToFinalPath instead.

@liancheng
Copy link
Contributor Author

@scwf Thanks for all the information you provided offline :)

@liancheng
Copy link
Contributor Author

According to previous failed Jenkins builds (1, 2, etc.), Hadoop 1.0.3 and 2.0 are vulnerable, 2.2 and above are OK. That explains why this PR together with #2226 always passes Jenkins -- the PR builder uses Hadoop 2.3.

@scwf
Copy link
Contributor

scwf commented Oct 4, 2014

Get it.

@liancheng
Copy link
Contributor Author

The reason why _SUCCESS is reserved is because semantics of FileSystem.globStatus got changed, and Utilities.getFileStatusRecurse relies on it to find out all partition data files.

Test code:

object GlobExperiments extends App {
  val conf = new Configuration()
  val fs = FileSystem.getLocal(conf)
  fs.globStatus(new Path("/tmp/wh/*/*/*")).foreach { status =>
    println(status.getPath)
  }
}

Target directory structure:

/tmp/wh
├── dir0
│   ├── dir1
│   │   └── level2
│   └── level1
└── level0

Hadoop 2.4.1 result:

file:/tmp/wh/dir0/dir1/level2

Hadoop 1.0.4 resuet:

file:/tmp/wh/dir0/dir1/level2
file:/tmp/wh/dir0/level1
file:/tmp/wh/level0

asfgit pushed a commit that referenced this pull request Oct 5, 2014
… versions

This is a follow up of #2226 and #2616 to fix Jenkins master SBT build failures for lower Hadoop versions (1.0.x and 2.0.x).

The root cause is the semantics difference of `FileSystem.globStatus()` between different versions of Hadoop, as illustrated by the following test code:

```scala
object GlobExperiments extends App {
  val conf = new Configuration()
  val fs = FileSystem.getLocal(conf)
  fs.globStatus(new Path("/tmp/wh/*/*/*")).foreach { status =>
    println(status.getPath)
  }
}
```

Target directory structure:

```
/tmp/wh
├── dir0
│   ├── dir1
│   │   └── level2
│   └── level1
└── level0
```

Hadoop 2.4.1 result:

```
file:/tmp/wh/dir0/dir1/level2
```

Hadoop 1.0.4 resuet:

```
file:/tmp/wh/dir0/dir1/level2
file:/tmp/wh/dir0/level1
file:/tmp/wh/level0
```

In #2226 and #2616, we call `FileOutputCommitter.commitJob()` at the end of the job, and the `_SUCCESS` mark file is written. When working with lower Hadoop versions, due to the `globStatus()` semantics issue, `_SUCCESS` is included as a separate partition data file by `Hive.loadDynamicPartitions()`, and fails partition spec checking.  The fix introduced in this PR is kind of a hack: when inserting data with dynamic partitioning, we intentionally avoid writing the `_SUCCESS` marker to workaround this issue.

Hive doesn't suffer this issue because `FileSinkOperator` doesn't call `FileOutputCommitter.commitJob()`, instead, it calls `Utilities.mvFileToFinalPath()` to cleanup the output directory and then loads it into Hive warehouse by with `loadDynamicPartitions()`/`loadPartition()`/`loadTable()`. This approach is better because it handles failed job and speculative tasks properly. We should add this step to `InsertIntoHiveTable` in another PR.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #2663 from liancheng/dp-hadoop-1-fix and squashes the following commits:

0177dae [Cheng Lian] Fixes dynamic partitioning support for lower Hadoop versions
@liancheng liancheng deleted the dp-fix branch February 24, 2015 17:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants