-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-9024] Unsafe HashJoin/HashOuterJoin/HashSemiJoin #7480
Conversation
@JoshRosen @rxin Please take a early look. |
Test build #37665 has finished for PR 7480 at commit
|
Test build #37666 has finished for PR 7480 at commit
|
sizeEstimate: Int = 64): HashedRelation = { | ||
|
||
// TODO: Use BytesToBytesMap. | ||
val hashTable = new JavaHashMap[UnsafeRow, CompactBuffer[UnsafeRow]](sizeEstimate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might be ok to just build this using a java hashmap first, and then build a giant byte array from this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking we can re-order the values in BytesToBytesMap during serialization.
Test build #37855 has finished for PR 7480 at commit
|
public boolean equals(Object other) { | ||
if (other instanceof UnsafeRow) { | ||
UnsafeRow o = (UnsafeRow) other; | ||
return ByteArrayMethods.arrayEquals(baseObject, baseOffset, o.baseObject, o.baseOffset, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should check whether the rows' sizeInBytes
are equal before attempting to compare their contents.
Test build #37857 has finished for PR 7480 at commit
|
|
||
val values = hashTable.get(unsafeKey) | ||
// Return GenericInternalRow to work with other JoinRow, which | ||
// TODO(davies): return UnsafeRow once we have UnsafeJoinRow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're not going to implement this as part of this patch, then let's make sure to file a followup JIRA under the Tungsten umbrella.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's already on the last week's TODO-list
@@ -83,12 +83,23 @@ abstract class UnsafeProjection extends Projection { | |||
} | |||
|
|||
object UnsafeProjection { | |||
def canSupport(schema: StructType): Boolean = canSupport(schema.fields.map(_.dataType)) | |||
def canSupport(types: Seq[DataType]): Boolean = types.forall(UnsafeColumnWriter.canEmbed(_)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could even add a canSupport(exprs: Seq[Expression])
to be able to save some characters elsewhere.
|
||
def this() = this(null, null, null) // Needed for serialization | ||
|
||
// UnsafeProjection is not thread safe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate on why instances of UnsafeProjection are not thread-safe? I don't see any mention of this in the Scaladoc for UnsafeProjection
, so we should probably update it to make any thread-safety concerns clearer.
Does UnsafeHashedRelation
have to be thread-safe? I thought it was only used in the context of a single task.
c => if (!c.outputsUnsafeRows) ConvertToUnsafe(c) else c | ||
// If this operator's children produce both unsafe and safe rows, | ||
// convert everything unsafe rows if all the schema of them are support by UnsafeRow | ||
if (operator.children.forall(c => UnsafeProjection.canSupport(c.schema))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping @JoshRosen
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change looks good to me.
@JoshRosen Thanks, I will merge this once Jenkins is happy. |
Test build #37979 has finished for PR 7480 at commit
|
Test build #37985 has finished for PR 7480 at commit
|
Test build #37999 has finished for PR 7480 at commit
|
Test build #38028 has finished for PR 7480 at commit
|
Test build #1154 has finished for PR 7480 at commit
|
Test build #1155 has finished for PR 7480 at commit
|
Test build #1156 has finished for PR 7480 at commit
|
Test build #1160 has finished for PR 7480 at commit
|
public boolean anyNull() { | ||
return BitSetMethods.anySet(baseObject, baseOffset, bitSetWidthInBytes); | ||
return BitSetMethods.anySet(baseObject, baseOffset, bitSetWidthInBytes / 8); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a unit test for this? i'd imagine it affects correctness
Test build #1171 has finished for PR 7480 at commit
|
Merged this into master, will address these comments in follow up PR. |
if (supportUnsafe) { | ||
UnsafeProjection.create(self.schema) | ||
} else { | ||
new Projection { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think you just want scala's "identity" here
This PR introduce BytesToBytesMap to UnsafeHashedRelation, use it in executor for better performance. It serialize all the key and values from java HashMap, put them into a BytesToBytesMap while deserializing. All the values for a same key are stored continuous to have better memory locality. This PR also address the comments for #7480 , do some clean up. Author: Davies Liu <davies@databricks.com> Closes #7592 from davies/unsafe_map2 and squashes the following commits: 42c578a [Davies Liu] Merge branch 'master' of github.com:apache/spark into unsafe_map2 fd09528 [Davies Liu] remove thread local cache and update docs 1c5ad8d [Davies Liu] fix test 5eb1b5a [Davies Liu] address comments in #7480 46f1f22 [Davies Liu] fix style fc221e0 [Davies Liu] use BytesToBytesMap for broadcast join
This PR introduce unsafe version (using UnsafeRow) of HashJoin, HashOuterJoin and HashSemiJoin, including the broadcast one and shuffle one (except FullOuterJoin, which is better to be implemented using SortMergeJoin).
It use HashMap to store UnsafeRow right now, will change to use BytesToBytesMap for better performance (in another PR).