Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-19.1: rowexec: release buckets from hash aggregator eagerly #47520

Merged
merged 1 commit into from Apr 15, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 11 additions & 2 deletions pkg/sql/distsqlrun/aggregator.go
Expand Up @@ -414,7 +414,6 @@ func (ag *aggregatorBase) start(ctx context.Context, procName string) context.Co
func (ag *hashAggregator) close() {
if ag.InternalClose() {
log.VEventf(ag.Ctx, 2, "exiting aggregator")
ag.bucketsAcc.Close(ag.Ctx)
// If we have started emitting rows, bucketsIter will represent which
// buckets are still open, since buckets are closed once their results are
// emitted.
Expand All @@ -427,6 +426,9 @@ func (ag *hashAggregator) close() {
ag.buckets[bucket].close(ag.Ctx)
}
}
// Make sure to release any remaining memory under 'buckets'.
ag.buckets = nil
ag.bucketsAcc.Close(ag.Ctx)
ag.MemMonitor.Stop(ag.Ctx)
}
}
Expand Down Expand Up @@ -645,7 +647,14 @@ func (ag *hashAggregator) emitRow() (aggregatorState, sqlbase.EncDatumRow, *Prod
bucket := ag.bucketsIter[0]
ag.bucketsIter = ag.bucketsIter[1:]

return ag.getAggResults(ag.buckets[bucket])
// Once we get the results from the bucket, we can delete it from the map.
// This will allow us to return the memory to the system before the hash
// aggregator is fully done (which matters when we have many buckets).
// NOTE: accounting for the memory under aggregate builtins in the bucket
// is updated in getAggResults (the bucket will be closed).
state, row, meta := ag.getAggResults(ag.buckets[bucket])
delete(ag.buckets, bucket)
return state, row, meta
}

// emitRow constructs an output row from an accumulated bucket and returns it.
Expand Down