Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19256][SQL] Hive bucketing support #19001

Closed
wants to merge 6 commits into from

Conversation

tejasapatil
Copy link
Contributor

What changes were proposed in this pull request?

This PR implements both read and write side changes for supporting hive bucketing in Spark. I had initially created a PR for just the write side changes (#18954) for simplicity. If reviewers want to review reader and writer side changes separately, I am happy to wait for the writer side PR to get merged and then send a new PR for reader side changes.

Semantics for read:

  • outputPartitioning while scanning hive table would be the set of bucketing columns (whether its partitioned or not, whether you are reading single partition or multiple partitions)
  • outputOrdering would be the sort columns (actually prefix subset of sort columns being read from the table).
  • In case of reading multiple hive partitions of the table, there would be multiple files per bucket so global sorting across buckets is not there. Thus we would have to ignore the sort information.
  • See the documentation in HiveTableScanExec where the outputPartitioning and outputOrdering is populated for more nitty gritty details.

Semantics for write:

  • If the Hive table is bucketed, then INSERT node expect the child distribution to be based on the hash of the bucket columns. Else it would be empty. (Just to compare with Spark native bucketing : the required distribution is not enforced even if the table is bucketed or not... this saves the shuffle in comparison with hive).
  • Sort ordering for INSERT node over Hive bucketed table is determined as follows:
Table type Normal table Bucketed table
non-partitioned insert Nil sort columns
static partition Nil sort columns
dynamic partitions partition columns (partition columns + bucketId + sort columns)

Just to compare how sort ordering is expressed for Spark native bucketing:

Table type Normal table Bucketed table
sort ordering partition columns (partition columns + bucketId + sort columns)

Why is there a difference ? With hive, since there bucketed insertions would need a shuffle, sort ordering can be relaxed for both non-partitioned and static partition cases. Every RDD partition would get rows corresponding to a single bucket so those can be written to corresponding output file after sort. In case of dynamic partitions, the rows need to be routed to appropriate partition which makes it similar to Spark's constraints.

  • Only Overwrite mode is allowed for hive bucketed tables as any other mode will break the bucketing guarantees of the table. This is a difference wrt how Spark bucketing works.
  • With the PR, if there are no files created for empty buckets, the query will fail. Will support creation of empty files in coming iteration. This is a difference wrt how Spark bucketing works as it does NOT need files for empty buckets.

Summary of changes done:

  • ClusteredDistribution and HashPartitioning are modified to store the hashing function used.
  • RunnableCommand's' can now express the required distribution and ordering. This is used by ExecutedCommandExec which run these commands
    • The good thing about this is that I could remove the logic for enforcing sort ordering inside FileFormatWriter which felt out of place. Ideally, this kinda adding of physical nodes should be done within the planner which is what happens with this PR.
  • InsertIntoHiveTable enforces both distribution and sort ordering
  • InsertIntoHadoopFsRelationCommand enforces sort ordering ONLY (and not the distribution)
  • Fixed a bug due to which any alter commands to bucketed table (eg. updating stats) would wipe out the bucketing spec from metastore. This made insertions to bucketed table non-idempotent operation.
  • HiveTableScanExec populates outputPartitioning and outputOrdering based on table's metadata, configs and the query
  • HadoopTableReader to use BucketizedSparkInputFormat for bucketed reads

How was this patch tested?

  • Added new unit tests

@tejasapatil
Copy link
Contributor Author


package org.apache.hadoop.hive.ql.io;

import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @tejasapatil
Is this the only actual Hive dependency? Without this, it seems that BucketizedSparkInputFormat and BucketizedSparkRecordReader can be promoted to sql/core.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we gain out of moving it to sql/core given that they are quite specific for Hive ? I don't see any other use cases besides hive benefiting from it so decided to keep it in sql/hive and have sql/core cleaner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks.

@SparkQA
Copy link

SparkQA commented Aug 20, 2017

Test build #80879 has finished for PR 19001 at commit 02d8711.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • throw new IOException(\"Cannot find class \" + inputFormatClassName, e);
  • throw new IOException(\"Unable to find the InputFormat class \" + inputFormatClassName, e);

@tejasapatil
Copy link
Contributor Author

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented Aug 20, 2017

Test build #80885 has finished for PR 19001 at commit 02d8711.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • throw new IOException(\"Cannot find class \" + inputFormatClassName, e);
  • throw new IOException(\"Unable to find the InputFormat class \" + inputFormatClassName, e);

@SparkQA
Copy link

SparkQA commented Aug 20, 2017

Test build #80900 has finished for PR 19001 at commit 02d8711.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • throw new IOException(\"Cannot find class \" + inputFormatClassName, e);
  • throw new IOException(\"Unable to find the InputFormat class \" + inputFormatClassName, e);

@dongjoon-hyun
Copy link
Member

The R failure looks irrelevant.

1. Error: spark.logit (@test_mllib_classification.R#288) -----------------------
java.lang.IllegalArgumentException: requirement failed: The input column stridx_c3082b343085 should have at least two distinct values.

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Aug 20, 2017

Test build #80908 has finished for PR 19001 at commit 02d8711.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • throw new IOException(\"Cannot find class \" + inputFormatClassName, e);
  • throw new IOException(\"Unable to find the InputFormat class \" + inputFormatClassName, e);

@SparkQA
Copy link

SparkQA commented Aug 22, 2017

Test build #81005 has finished for PR 19001 at commit a30b6ce.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • throw new IOException(\"Cannot find class \" + inputFormatClassName, e);
  • throw new IOException(\"Unable to find the InputFormat class \" + inputFormatClassName, e);

@tejasapatil
Copy link
Contributor Author

ping @cloud-fan @gatorsmile

@tejasapatil
Copy link
Contributor Author

#19080 is improving the distribution semantic in planner. Will wait for that to get in.

@cloud-fan
Copy link
Contributor

With the simplified distribution semantic, I think it's much easier to support the hive bucketing. We only need to create a HiveHashPartitioning, implement it similar to HashPartitioning without satisfying HashPartitionedDistribution, and then we can avoid shuffle for bucketed hive table in many cases like aggregate, repartitionBy, broadcast join, etc.

For non-broadcast join, we have the potential to support it, after we make the hash function configurable for HashPartitionedDistribution.

@tejasapatil
Copy link
Contributor Author

Now that #19080 has been merged to trunk, I am rebasing this PR. A small part of this PR is put in #20206 and ready for review.

@tejasapatil
Copy link
Contributor Author

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented Jan 12, 2018

Test build #86013 has finished for PR 19001 at commit 7b8a072.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • throw new IOException(\"Cannot find class \" + inputFormatClassName, e);
  • throw new IOException(\"Unable to find the InputFormat class \" + inputFormatClassName, e);

@SparkQA
Copy link

SparkQA commented Jan 13, 2018

Test build #86074 has finished for PR 19001 at commit 3c367a0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • throw new IOException(\"Cannot find class \" + inputFormatClassName, e);
  • throw new IOException(\"Unable to find the InputFormat class \" + inputFormatClassName, e);

@tejasapatil
Copy link
Contributor Author

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented Jan 13, 2018

Test build #86085 has finished for PR 19001 at commit d37eb8b.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tejasapatil
Copy link
Contributor Author

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented Jan 13, 2018

Test build #86097 has finished for PR 19001 at commit d37eb8b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tejasapatil
Copy link
Contributor Author

cc @cloud-fan @gatorsmile @sameeragarwal for review

val orderingExpr = requiredOrdering
.map(SortOrder(_, Ascending))
.map(BindReferences.bindReference(_, outputSpec.outputColumns))
SortExec(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing SortExec here and adding it in EnsureRequirements Strategy will have impact on many other DataWritingCommands which depends on FileFormatWriter, like CreateDataSourceTableAsSelectCommand. To fix it code changes are needed onto such DataWritingCommand implementations to export requiredDistribution and requiredOrdering.

}

/**
* How is `requiredOrdering` determined ?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the definition of requiredOrdering here differs from that in InsertIntoHiveTable?

newJob.setInputFormat(inputFormat.getClass());

for (int i = 0; i < numBuckets; i++) {
final FileStatus fileStatus = listStatus[i];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic depends on the files are listed in a right order, otherwise the RDD partitions to be joined cannot be zipped correctly. Logic should be fixed here to reorder the files listed.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall looks good, but we should separate this PR into smaller ones.

right: SparkPlan) extends BinaryExecNode with CodegenSupport {
right: SparkPlan,
requiredNumPartitions: Option[Int] = None,
hashingFunctionClass: Class[_ <: HashExpression[Int]] = classOf[Murmur3Hash])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be done in a followup. For the first version we can just add a HiveHashPartitioning, which can satisfy ClusteredDistribution(save shuffle for aggregate) but not HashClusteredDistribution(can't save shuffle for join).

@@ -43,7 +44,13 @@ trait RunnableCommand extends Command {
// `ExecutedCommand` during query planning.
lazy val metrics: Map[String, SQLMetric] = Map.empty

def run(sparkSession: SparkSession): Seq[Row]
def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExecutedCommandExec doesn't call it.

@@ -156,40 +144,14 @@ object FileFormatWriter extends Logging {
statsTrackers = statsTrackers
)

// We should first sort by partition columns, then bucket id, and finally sorting columns.
val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we send an individual PR to do this? i.e. do the sorting via requiredOrdering instead of doing it manually.

@HyukjinKwon
Copy link
Member

Hi all, any updates on this PR?

@tejasapatil
Copy link
Contributor Author

I will close this for now

@cozos
Copy link
Contributor

cozos commented Jul 4, 2019

Will work on this continue in the future?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants