Skip to content

Commit

Permalink
distsqlrun: release buckets from hash aggregator eagerly
Browse files Browse the repository at this point in the history
This commit makes hash aggregator release the memory under buckets
eagerly (once we're done with the bucket) so that it is returned to the
system. This can matter a lot when we have large number of buckets (on
the order of 100k). Previously, this would happen only on the flow
shutdown, once we're losing the references to `hashAggregator`
processor. But it was problematic - we "released" the associated memory
from the memory accounting, yet we were holding the references still.
With this commit we will reduce the memory footprint and we'll be a lot
closer to what our memory accounting thinks we're using.

Release note (bug fix): Previously, CockroachDB was incorrectly
releasing memory used by hash aggregation (we were releasing the correct
amount from the internal memory accounting system but, by mistake, were
keeping the references to the actual memory for some time which
prohibited the memory to be garbage collected). This could lead to
a crash (which was more likely when hash aggregation had store on the
order of 100k of groups) and is now fixed.
  • Loading branch information
yuzefovich committed Apr 15, 2020
1 parent c406bb1 commit 17672fb
Showing 1 changed file with 11 additions and 2 deletions.
13 changes: 11 additions & 2 deletions pkg/sql/distsqlrun/aggregator.go
Expand Up @@ -414,7 +414,6 @@ func (ag *aggregatorBase) start(ctx context.Context, procName string) context.Co
func (ag *hashAggregator) close() {
if ag.InternalClose() {
log.VEventf(ag.Ctx, 2, "exiting aggregator")
ag.bucketsAcc.Close(ag.Ctx)
// If we have started emitting rows, bucketsIter will represent which
// buckets are still open, since buckets are closed once their results are
// emitted.
Expand All @@ -427,6 +426,9 @@ func (ag *hashAggregator) close() {
ag.buckets[bucket].close(ag.Ctx)
}
}
// Make sure to release any remaining memory under 'buckets'.
ag.buckets = nil
ag.bucketsAcc.Close(ag.Ctx)
ag.MemMonitor.Stop(ag.Ctx)
}
}
Expand Down Expand Up @@ -645,7 +647,14 @@ func (ag *hashAggregator) emitRow() (aggregatorState, sqlbase.EncDatumRow, *Prod
bucket := ag.bucketsIter[0]
ag.bucketsIter = ag.bucketsIter[1:]

return ag.getAggResults(ag.buckets[bucket])
// Once we get the results from the bucket, we can delete it from the map.
// This will allow us to return the memory to the system before the hash
// aggregator is fully done (which matters when we have many buckets).
// NOTE: accounting for the memory under aggregate builtins in the bucket
// is updated in getAggResults (the bucket will be closed).
state, row, meta := ag.getAggResults(ag.buckets[bucket])
delete(ag.buckets, bucket)
return state, row, meta
}

// emitRow constructs an output row from an accumulated bucket and returns it.
Expand Down

0 comments on commit 17672fb

Please sign in to comment.