Skip to content

Commit

Permalink
Merge pull request #47520 from yuzefovich/backport19.1-47466
Browse files Browse the repository at this point in the history
release-19.1: rowexec: release buckets from hash aggregator eagerly
  • Loading branch information
yuzefovich committed Apr 15, 2020
2 parents 1ec43d0 + 17672fb commit 86a7b62
Showing 1 changed file with 11 additions and 2 deletions.
13 changes: 11 additions & 2 deletions pkg/sql/distsqlrun/aggregator.go
Expand Up @@ -414,7 +414,6 @@ func (ag *aggregatorBase) start(ctx context.Context, procName string) context.Co
func (ag *hashAggregator) close() {
if ag.InternalClose() {
log.VEventf(ag.Ctx, 2, "exiting aggregator")
ag.bucketsAcc.Close(ag.Ctx)
// If we have started emitting rows, bucketsIter will represent which
// buckets are still open, since buckets are closed once their results are
// emitted.
Expand All @@ -427,6 +426,9 @@ func (ag *hashAggregator) close() {
ag.buckets[bucket].close(ag.Ctx)
}
}
// Make sure to release any remaining memory under 'buckets'.
ag.buckets = nil
ag.bucketsAcc.Close(ag.Ctx)
ag.MemMonitor.Stop(ag.Ctx)
}
}
Expand Down Expand Up @@ -645,7 +647,14 @@ func (ag *hashAggregator) emitRow() (aggregatorState, sqlbase.EncDatumRow, *Prod
bucket := ag.bucketsIter[0]
ag.bucketsIter = ag.bucketsIter[1:]

return ag.getAggResults(ag.buckets[bucket])
// Once we get the results from the bucket, we can delete it from the map.
// This will allow us to return the memory to the system before the hash
// aggregator is fully done (which matters when we have many buckets).
// NOTE: accounting for the memory under aggregate builtins in the bucket
// is updated in getAggResults (the bucket will be closed).
state, row, meta := ag.getAggResults(ag.buckets[bucket])
delete(ag.buckets, bucket)
return state, row, meta
}

// emitRow constructs an output row from an accumulated bucket and returns it.
Expand Down

0 comments on commit 86a7b62

Please sign in to comment.