Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39607][SQL][DSV2] Distribution and ordering support V2 function in writing #36995

Closed
wants to merge 1 commit into from

Conversation

pan3793
Copy link
Member

@pan3793 pan3793 commented Jun 26, 2022

What changes were proposed in this pull request?

Add new feature, make distribution and ordering support V2 function in writing.

Currently, the rule V2Writes support converting ApplyTransform to TransformExpression (unevaluable), this PR makes V2Writes supports converting TransformExpression to ApplyFunctionExpression/Invoke/StaticInvoke (evaluable).

Why are the changes needed?

SPARK-33779 introduced API for DSv2 writer to claim distributions and orderings of data before writing, w/ SPARK-34026, Spark can translate IdentityTransform to catalyst expression in distributions and orderings expressions.

But for some databases like ClickHouse, which allows table partition defined by an expression, e.g. PARTITIONED BY num % 10 it's useful to support translating ApplyTransform so Spark can organize the data to fit the target storage requirement before writing.

Does this PR introduce any user-facing change?

Yes, user can use V2 function as partition transform in DSv2 connector.

How was this patch tested?

UT added.

@github-actions github-actions bot added the SQL label Jun 26, 2022
@pan3793
Copy link
Member Author

pan3793 commented Jun 27, 2022

@AmplabJenkins
Copy link

Can one of the admins verify this patch?


case _ =>
query
}

def resolveTransformExpression(expr: Expression): Expression = expr.transform {
case TransformExpression(scalarFunc: ScalarFunction[_], arguments, Some(numBuckets)) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't recall the details now. When do we need to translate v2 trasnform to TransformExpression?

Copy link
Member Author

@pan3793 pan3793 Jul 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TransformExpression is introduced in SPARK-37377(#35657), to check input partition compatibility, it's constructed in V2ScanPartitioningAndOrdering

Copy link
Member Author

@pan3793 pan3793 Jul 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPARK-37377(#35657) changes the common code used by V2 reading & writing, converting the Transform to the TransformExpression.
If wanted, we can skip TransformExpression and resolve Transform to ApplyFunctionExpression / Invoke / StaticInvoke directly in V2 writing, but this may cause V2 reading & writing to share less code.

@pan3793
Copy link
Member Author

pan3793 commented Jul 15, 2022

The rule V2Writers is placed in the optimizing phase but does something maybe should happen in analyzing phase like resolve function and implicit type cast. The current approach is trying to minimize the code change, I'm not sure if it's the right direction.

@pan3793
Copy link
Member Author

pan3793 commented Jul 20, 2022

@cloud-fan @sunchao sorry for bothering you again, please take a look when you have time.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks OK to me. I'm just a bit confused on how the transforms are going to be used in the write path & shuffling data.

cc @aokolnychyi too

import org.apache.spark.sql.connector.distributions._
import org.apache.spark.sql.connector.write.{RequiresDistributionAndOrdering, Write}
import org.apache.spark.sql.errors.QueryCompilationErrors

object DistributionAndOrderingUtils {

def prepareQuery(write: Write, query: LogicalPlan): LogicalPlan = write match {
def prepareQuery(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I wonder how does the write work with transforms such as bucket. For example, suppose the required distribution is bucket(col, 100), Spark currently will compute the partition (bucket) ID by murmur_hash(bucket(col, 100)) pmod 100, so the value of col is essentially hashed twice. I'm not sure whether this breaks any assumption from the V2 data source side, or whether it has any effect in the hash key distributions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark currently will compute the partition (bucket) ID by murmur_hash(bucket(col, 100)) pmod 100

It's only happening on the V1 write path. In V2, the bucket is resolved as BucketTransform in analyzing phase, and converted to evaluable catalyst expression ApplyFunctionExpression/Invoke/StaticInvoke here, so I don't think your concern will happen.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yes it will be converted to ApplyFunctionExpression etc. But in V2 write path, these partition transforms will be used in RepartitionByExpression, and then converted into ShuffleExchangeExec during physical planning, right?

Checking the code path, the ApplyFunctionExpression / Invoke / StaticInvoke, etc, will be passed to RepartitionByExpression first, as field partitionExpressions, and then be passed to HashPartitioning, RangePartitioning etc, in HasPartitionExpressions, and eventually be used in places like HashPartitioning.partitionIdExpression where the computation of partition ID I mentioned above happened.

Copy link
Member Author

@pan3793 pan3793 Aug 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining, educated. After reading the code, I think you are right that "the value of col is essentially hashed twice", but I don't think it will bring correctness issues, because it still guarantees that the same values will be clustered into the same partition.
One example is Hive bucket. In V1WritesUtils#getWriterBucketSpec, both HiveHash and HashPartitioning#partitionIdExpression can be used to construct bucketIdExpression.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just thinking whether we should enforce stronger semantics on the bucketing. Consider we build bucketed table support for DSv2 file source on top on this mechanism, does it mean for a particular row, it could be hashed to a different bucket ID than V1? What if someone wants to first write a bucketed table using V2 and then read back in V1, and perhaps do bucket join with another V1 bucketed table? I wonder if that could cause incorrect results.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that different storage system usually defines their own hash functions and sharding(bucketing) rules, I don't think we can make bucket tables of all data sources fully compatible w/ each other, e.g. Spark, Hive, Iceberg using the total different hash algorithm for bucketing.

We can make that DS V1 file bucket table compatible w/ V2 file bucket table, and Hive bucket table compatible w/ V2 Hive bucket table, but DS file bucket table can not be compatible w/ Hive(neither V1 nor V2) since they use the different hash algorithm.

In detail, as V2SessionCatalog extends FunctionCatalog, to make V2 file bucket tables compatible w/ V1, we can introduce and register a SparkBucket V2 function using the same hash algorithm V1 in V2SessionCatalog. For Hive tables, just register a similar HiveBucket V2 function using the Hive hash algorithm in HiveCatalog.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @sunchao brings a valid point that is easy to overlook. We have to make sure Spark writes to Hive tables in the same way no matter whether the v1 or v2 path is being used.

Would it be correct to say we have this issue because partitionIdExpression in HashPartitioning is used both for generating bucket IDs in Hive tables as well as for producing partition IDs for writing tasks? Can we use different mechanisms?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After SPARK-32709(#33432), it allows using different hash expressions for generating partition IDs and bucket IDs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider we build bucketed table support for DSv2 file source on top on this mechanism, does it mean for a particular row, it could be hashed to a different bucket ID than V1? What if someone wants to first write a bucketed table using V2 and then read back in V1, and perhaps do bucket join with another V1 bucketed table?

@sunchao, coming back to the use case you mentioned above. I think the bucket ID will be always the same as long as the task writer respects the table bucket spec and Spark shuffles all records that are supposed to land in one bucket to one task. Like @pan3793 said, I suppose V2 file source will request a distribution using a V2 function that would wrap the internal Spark hash function. That should guarantee that all records for the same bucket will land in one task. As long as the task writer uses the correct bucket expression (based on the table definition), we should be good, right?

You are right the partition/task ID during writes for a particular row may be different in V1 and V2 because of double hashing. But does it actually matter, though?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply here. Yes, I think it should be OK as long as the task writer uses the bucket expression to decide which bucket the input record should go. There will be some extra work to achieve V1 and V2 compatibility but it's not that relevant to this PR now.

case write: RequiresDistributionAndOrdering =>
val numPartitions = write.requiredNumPartitions()

val distribution = write.requiredDistribution match {
case d: OrderedDistribution => toCatalystOrdering(d.ordering(), query)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what was the behavior before this PR? do we fail to translate v2 function or we fail at runtime complaining that some expression can't be evaluated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The former, the query is going to fail w/ org.apache.spark.sql.AnalysisException: ${expr} is not currently supported

@aokolnychyi
Copy link
Contributor

I'd love to take a look tomorrow as well (not blocking).

import org.apache.spark.sql.connector.distributions._
import org.apache.spark.sql.connector.write.{RequiresDistributionAndOrdering, Write}
import org.apache.spark.sql.errors.QueryCompilationErrors

object DistributionAndOrderingUtils {

def prepareQuery(write: Write, query: LogicalPlan): LogicalPlan = write match {
def prepareQuery(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @sunchao brings a valid point that is easy to overlook. We have to make sure Spark writes to Hive tables in the same way no matter whether the v1 or v2 path is being used.

Would it be correct to say we have this issue because partitionIdExpression in HashPartitioning is used both for generating bucket IDs in Hive tables as well as for producing partition IDs for writing tasks? Can we use different mechanisms?

@cloud-fan
Copy link
Contributor

In general, this feature looks reasonable, but it's interesting to discuss the behavior of "v2 write required distribution" with this new feature.

Let's assume the required distribution is ClusteredDistribution, its doc says

/**
 * A distribution where tuples that share the same values for clustering expressions are co-located
 * in the same partition.
 *
 * @since 3.2.0
 */
@Experimental
public interface ClusteredDistribution extends Distribution

This means, the clustering expressions are the keys, and Spark makes sure records with the same keys go to the same partition. What Spark does is: for each record, calculate the keys, hash the keys and assign a partition ID for the record based on the hash of the keys.

How can we use this feature to implement bucket writing? We can use the expression (a v2 function) that calculates the bucket ID as the clustering expressions. Then Spark will make sure records with the same bucket ID will be in the same partition. However, the problem of this approach is low parallelism (at most number of buckets).

A different approach is to use the bucket columns as the clustering expressions. Spark will make sure records with the same bucket columns values will be in the same partition. Then the v2 write can require a local sort with bucket id (a v2 function) so that records with the same bucket ID will be grouped together.

That said, I think most users will not use bucket transform as the clustering expressions. If they do, it's there choice and Spark won't do anything wrong.

What do you think? @sunchao @aokolnychyi

@sunchao
Copy link
Member

sunchao commented Aug 24, 2022

How can we use this feature to implement bucket writing? We can use the expression (a v2 function) that calculates the bucket ID as the clustering expressions. Then Spark will make sure records with the same bucket ID will be in the same partition. However, the problem of this approach is low parallelism (at most number of buckets).

@cloud-fan I think you raised a good point. With the double-hashing mentioned above the parallelism could even be less than the number of buckets due to collision (but I guess this is just a minor thing since the chance is low). Even though the actual number of Spark tasks may be much larger than the number of buckets, most of the tasks will receive empty input in this scenario.

A different approach is to use the bucket columns as the clustering expressions. Spark will make sure records with the same bucket columns values will be in the same partition. Then the v2 write can require a local sort with bucket id (a v2 function) so that records with the same bucket ID will be grouped together.

This means it now relies on Spark's hash function for bucketing though, which could be different from other engines. I think it would cause compatibility issues, right?

That said, I think most users will not use bucket transform as the clustering expressions.

Hmm I'm not sure whether this is true. @aokolnychyi may know more from Iceberg side.

@sunchao
Copy link
Member

sunchao commented Aug 24, 2022

In general, let's say a V2 transform function maps a key to a value, given a set of keys, the value space should always be <= the key space. Therefore, it's seems better for Spark to shuffle the records based on the keys, while still uses the V2 transform function provided to determine the actual partition/bucket a record should belong to.

Maybe we can introduce a special case for KeyGroupedPartitioning in ShuffleExchangeExec, which only consider keys, rather values after V2 transform function evaluation, when computing hash and deciding the partition ID for an input record.

@cloud-fan
Copy link
Contributor

cloud-fan commented Aug 25, 2022

This means it now relies on Spark's hash function for bucketing though, which could be different from other engines.

Let's think about it this way: The v2 data source only needs Spark to local-sort the data by bucket id, which means the required ordering will be a v2 function that generates bucket id. Then the v2 writer generates the bucket id again using the same v2 function during data writing. Or the v2 writer can just use a hash map to keep open file handlers so that Spark doesn't need to sort the data. The extra clustering is only to reduce the number of files we write out.

The Spark hash algorithm only matters when reading bucketed tables and trying to avoid shuffles. I think this is handled well already. Spark will shuffle a v2 table scan even if it's bucketed if the other side of the join is a normal table scan with shuffle.

@sunchao
Copy link
Member

sunchao commented Aug 25, 2022

Makes sense. So you mean V2 data source providers can report only the clustered columns through RequiresDistributionAndOrdering while still use the V2 transform functions on those columns in the V2 writer tasks.

@cloud-fan
Copy link
Contributor

@sunchao yes

@aokolnychyi
Copy link
Contributor

aokolnychyi commented Aug 25, 2022

This means, the clustering expressions are the keys, and Spark makes sure records with the same keys go to the same partition. What Spark does is: for each record, calculate the keys, hash the keys and assign a partition ID for the record based on the hash of the keys.

How can we use this feature to implement bucket writing? We can use the expression (a v2 function) that calculates the bucket ID as the clustering expressions. Then Spark will make sure records with the same bucket ID will be in the same partition. However, the problem of this approach is low parallelism (at most number of buckets).

A different approach is to use the bucket columns as the clustering expressions. Spark will make sure records with the same bucket columns values will be in the same partition. Then the v2 write can require a local sort with bucket id (a v2 function) so that records with the same bucket ID will be grouped together.

@cloud-fan, I agree with your summary. It seems to me like a classic trade-off between fewer files (clustering by bucket ID) and better parallelism (local sort by bucket ID). I believe the current Spark API is flexible enough so that data sources can request either of those depending on their internal logic. The third alternative is to leverage an ordered distribution by bucket ID + some other key. In that case, Spark will do skew estimation while determining ranges. Each option has its own benefits and drawbacks so just allowing data sources to pick what works best should be enough.

To sum up, I feel the logic in this PR works and the existing API should cover all discussed cases.

What do you think, @cloud-fan @sunchao?

@aokolnychyi
Copy link
Contributor

I guess AQE can also help us to coalesce and split too big partitions during writes.

@pan3793
Copy link
Member Author

pan3793 commented Aug 28, 2022

I guess AQE can also help us to coalesce and split too big partitions during writes.

This should be covered by SPARK-37523.

Thanks @cloud-fan @sunchao @aokolnychyi for reviewing and discussing, and is there anything I can do before this PR gets in?

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 438233d Aug 29, 2022
@sunchao
Copy link
Member

sunchao commented Aug 29, 2022

late LGTM too, sorry for the delay on this PR @pan3793 !

sunchao pushed a commit that referenced this pull request Sep 7, 2022
…ribution/ordering

### What changes were proposed in this pull request?

This PR adapts `V2ExpressionUtils` to support arbitrary transforms with multiple args that are either references or literals.

### Why are the changes needed?

After PR #36995, data sources can request distribution and ordering that reference v2 functions. If a data source needs a transform with multiple input args or a transform where not all args are references, Spark will throw an exception.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

This PR adapts the test added recently in PR #36995.

Closes #37749 from aokolnychyi/spark-40295.

Lead-authored-by: aokolnychyi <aokolnychyi@apple.com>
Co-authored-by: Anton Okolnychyi <aokolnychyi@apple.com>
Signed-off-by: Chao Sun <sunchao@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants