Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38325][SQL] ANSI mode: avoid potential runtime error in HashJoin.extractKeyExprAt() #35659

Closed
wants to merge 2 commits into from

Conversation

gengliangwang
Copy link
Member

What changes were proposed in this pull request?

SubqueryBroadcastExec retrieves the partition key from the broadcast results based on the type of HashedRelation returned. If the key is packed inside a Long, we extract it through bitwise operations and cast it as Byte/Short/Int if necessary.

The casting here can cause a potential runtime error. This PR is to fix it.

Why are the changes needed?

Bug fix

Does this PR introduce any user-facing change?

Yes, avoid potential runtime error in dynamic pruning under ANSI mode

How was this patch tested?

UT

@gengliangwang
Copy link
Member Author

cc @entong as well

child = BoundReference(0, LongType, nullable = false),
dataType = keys(index).dataType,
timeZoneId = Option(conf.sessionLocalTimeZone),
ansiEnabled = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we do expect to get null value if overflow happens?

cc @c21

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default cast returns the lower bits when an overflow happens.
E.g.

> select cast(1025L as byte);
1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry just for my understanding (without running the unit test by myself): why we expect overflow happens here? Shouldn't rewriteKeyExpr() above already guarantees that we only rewrite keys to long when all keys fit into a long type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@c21 this PR is to avoid exceptions on overflow.
If you run HashedRelationSuite with ANSI mode on, there will be an overflow error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang - got it, thanks for explanation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see, so this algorithm expects the equivalent of java code byte b = (byte) long_value, which is the same as the non-ansi Cast behavior.

} else {
val shiftedBits =
keys.slice(index + 1, keys.size).map(_.dataType.defaultSize * 8).sum
val mask = (1L << (keys(index).dataType.defaultSize * 8)) - 1
// build the schema for unpacking the required key
cast(BitwiseAnd(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check cast() used in other places to see if they have the same issue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I checked and the other usages looks fine.

child = BoundReference(0, LongType, nullable = false),
dataType = keys(index).dataType,
timeZoneId = Option(conf.sessionLocalTimeZone),
ansiEnabled = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry just for my understanding (without running the unit test by myself): why we expect overflow happens here? Shouldn't rewriteKeyExpr() above already guarantees that we only rewrite keys to long when all keys fit into a long type?

Cast(
child = BoundReference(0, LongType, nullable = false),
dataType = keys(index).dataType,
timeZoneId = Option(conf.sessionLocalTimeZone),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that rewriteKeyExpr() above does not support timezone-related data type yet (Line 716). Shall we add an assertion for keys data type to be IntegralType and sum of key sizes no larger that 8 bytes in this method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, added assertion.

Copy link
Member Author

@gengliangwang gengliangwang Feb 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am keeping the timeZoneId here since we may support DateType/TimestampType in the future

val unsafeProj = UnsafeProjection.create(packed)
val packedKeys = unsafeProj(row)

Seq((0, ByteType), (1, IntegerType), (2, ShortType)).foreach { case (i, dt) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is it possible to add some overflow values test as well? Non-blocking comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case can already cause overflow on unpacking.

Copy link
Contributor

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thanks @gengliangwang for fixing this.

@gengliangwang
Copy link
Member Author

@cloud-fan @entong @c21 Thanks for the review.
Merging to master/3.2

gengliangwang added a commit that referenced this pull request Feb 25, 2022
…in.extractKeyExprAt()

### What changes were proposed in this pull request?

SubqueryBroadcastExec retrieves the partition key from the broadcast results based on the type of HashedRelation returned. If the key is packed inside a Long, we extract it through bitwise operations and cast it as Byte/Short/Int if necessary.

The casting here can cause a potential runtime error. This PR is to fix it.

### Why are the changes needed?

Bug fix
### Does this PR introduce _any_ user-facing change?

Yes, avoid potential runtime error in dynamic pruning under ANSI mode

### How was this patch tested?

UT

Closes #35659 from gengliangwang/fixHashJoin.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit 29eca8c)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
kazuyukitanimura pushed a commit to kazuyukitanimura/spark that referenced this pull request Aug 10, 2022
…in.extractKeyExprAt()

### What changes were proposed in this pull request?

SubqueryBroadcastExec retrieves the partition key from the broadcast results based on the type of HashedRelation returned. If the key is packed inside a Long, we extract it through bitwise operations and cast it as Byte/Short/Int if necessary.

The casting here can cause a potential runtime error. This PR is to fix it.

### Why are the changes needed?

Bug fix
### Does this PR introduce _any_ user-facing change?

Yes, avoid potential runtime error in dynamic pruning under ANSI mode

### How was this patch tested?

UT

Closes apache#35659 from gengliangwang/fixHashJoin.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit 29eca8c)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants