-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24999][SQL]Reduce unnecessary 'new' memory operations #21968
Conversation
val loc = new map.Location // this could be allocated in stack | ||
binaryMap.safeLookup(unsafeKey.getBaseObject, unsafeKey.getBaseOffset, | ||
unsafeKey.getSizeInBytes, loc, unsafeKey.hashCode()) | ||
val loc = map.lookup(unsafeKey.getBaseObject, unsafeKey.getBaseOffset, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this change makes this part thread-unsafe. Is it OK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is safe to lookup, and It is different from the get(key: InternalRow).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this PR, loc
is allocated at each call of getValue()
. After this PR, loc
will be shared within each binaryMap
that is passed to a constructor of UnsafeHashedRelation
.
Is this behavior change safe?
@@ -44,6 +44,12 @@ class RowBasedHashMapGenerator( | |||
groupingKeySchema, bufferSchema) { | |||
|
|||
override protected def initializeAggregateHashMap(): String = { | |||
val numVarLenFields = groupingKeys.map(_.dataType).count { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: can't this just be .count(!UnsafeRow.isFixedLength(_))
?
e223446
to
e0748e1
Compare
@@ -48,6 +48,8 @@ class RowBasedHashMapGenerator( | |||
val keySchema = ctx.addReferenceObj("keySchemaTerm", groupingKeySchema) | |||
val valueSchema = ctx.addReferenceObj("valueSchemaTerm", bufferSchema) | |||
|
|||
val numVarLenFields = groupingKeys.map(_.dataType).count(!UnsafeRow.isFixedLength(_)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not remove the TODO
comment below.
@@ -141,9 +141,6 @@ class RowBasedHashMapGenerator( | |||
| if (buckets[idx] == -1) { | |||
| if (numRows < capacity && !isBatchFull) { | |||
| // creating the unsafe for new entry | |||
| org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter | |||
| = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter( | |||
| ${groupingKeySchema.length}, ${numVarLenFields * 32}); | |||
| agg_rowWriter.reset(); //TODO: investigate if reset or zeroout are actually needed | |||
| agg_rowWriter.zeroOutNullBytes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, if groupingKeySchema
has no nullable field, can we drop agg_rowWriter.zeroOutNullBytes()
?
The change looks reasonable to me, so can you trigger tests? @gatorsmile @cloud-fan @hvanhovell |
Test build #4294 has finished for PR 21968 at commit
|
e0748e1
to
e4cec60
Compare
cc @maropu ,@kiszk, @cloud-fan |
can we do the same thing for the columnar one? |
cc @cloud-fan I'm sorry, I look |
@@ -141,11 +151,8 @@ class RowBasedHashMapGenerator( | |||
| if (buckets[idx] == -1) { | |||
| if (numRows < capacity && !isBatchFull) { | |||
| // creating the unsafe for new entry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove or update this comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, updated. thanks.
@@ -141,11 +151,8 @@ class RowBasedHashMapGenerator( | |||
| if (buckets[idx] == -1) { | |||
| if (numRows < capacity && !isBatchFull) { | |||
| // creating the unsafe for new entry | |||
| org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter | |||
| = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter( | |||
| ${groupingKeySchema.length}, ${numVarLenFields * 32}); | |||
| agg_rowWriter.reset(); //TODO: investigate if reset or zeroout are actually needed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think now reset and zero out is needed? So maybe remove this TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok,
@@ -48,6 +48,12 @@ class RowBasedHashMapGenerator( | |||
val keySchema = ctx.addReferenceObj("keySchemaTerm", groupingKeySchema) | |||
val valueSchema = ctx.addReferenceObj("valueSchemaTerm", bufferSchema) | |||
|
|||
val numVarLenFields = groupingKeys.map(_.dataType).count { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
groupingKeys.map(_.dataType).count(dt => !UnsafeRow.isFixedLength(dt))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, thanks.
@@ -130,6 +134,12 @@ class RowBasedHashMapGenerator( | |||
} | |||
}.mkString(";\n") | |||
|
|||
val nullByteWriter = if (groupingKeySchema.map(_.nullable).forall(_ == false)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe name it resetNullBits
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, thanks.
e4cec60
to
49703e8
Compare
@@ -48,6 +48,8 @@ class RowBasedHashMapGenerator( | |||
val keySchema = ctx.addReferenceObj("keySchemaTerm", groupingKeySchema) | |||
val valueSchema = ctx.addReferenceObj("valueSchemaTerm", bufferSchema) | |||
|
|||
val numVarLenFields = groupingKeys.map(_.dataType).count(dt => !UnsafeRow.isFixedLength(dt)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: .count(!UnsafeRow.isFixedLength(_))
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plz keep the comment // TODO: consider large decimal and interval type
below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan We want to discuss, how to modify?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the code style doesn't matter, both are fine. but let's keep the comment.
49703e8
to
3e4f2e4
Compare
LGTM cc @cloud-fan @hvanhovell @maropu |
thanks, merging to master! |
@cloud-fan thanks. |
What changes were proposed in this pull request?
This PR is to solve the CodeGen code generated by fast hash, and there is no need to apply for a block of memory for every new entry, because unsafeRow's memory can be reused.
How was this patch tested?
the existed test cases.