Skip to content

Commit

Permalink
Fix activeset weight calc performance in the proposal handler (#5923)
Browse files Browse the repository at this point in the history
## Motivation

Proposal handler may cause high CPU and memory usage.
  • Loading branch information
ivan4th committed May 10, 2024
1 parent 65d3eed commit b57e131
Show file tree
Hide file tree
Showing 3 changed files with 301 additions and 86 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Expand Up @@ -36,6 +36,9 @@ through v1.5.x first ([#5907](https://github.com/spacemeshos/go-spacemesh/pull/5

* [#5911](https://github.com/spacemeshos/go-spacemesh/pull/5911) Avoid pulling poet proof multiple times in 1:N setups

* [#5923](https://github.com/spacemeshos/go-spacemesh/pull/5923) Fix high memory consumption and performance issues
in the proposal handler

## (v1.5.0)

### Upgrade information
Expand Down
162 changes: 110 additions & 52 deletions proposals/handler.go
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

lru "github.com/hashicorp/golang-lru/v2"
Expand Down Expand Up @@ -48,16 +49,18 @@ type Handler struct {
logger log.Log
cfg Config

db *sql.Database
atxsdata *atxsdata.Data
activeSets *lru.Cache[types.Hash32, uint64]
edVerifier *signing.EdVerifier
publisher pubsub.Publisher
fetcher system.Fetcher
mesh meshProvider
validator eligibilityValidator
tortoise tortoiseProvider
clock layerClock
db *sql.Database
atxsdata *atxsdata.Data
activeSets *lru.Cache[types.Hash32, uint64]
edVerifier *signing.EdVerifier
publisher pubsub.Publisher
fetcher system.Fetcher
mesh meshProvider
validator eligibilityValidator
tortoise tortoiseProvider
weightCalcLock sync.Mutex
pendingWeightCalc map[types.Hash32][]chan uint64
clock layerClock

proposals proposalsConsumer
}
Expand Down Expand Up @@ -123,18 +126,19 @@ func NewHandler(
panic(err)
}
b := &Handler{
logger: log.NewNop(),
cfg: defaultConfig(),
db: db,
atxsdata: atxsdata,
proposals: proposals,
activeSets: activeSets,
edVerifier: edVerifier,
publisher: p,
fetcher: f,
mesh: m,
tortoise: tortoise,
clock: clock,
logger: log.NewNop(),
cfg: defaultConfig(),
db: db,
atxsdata: atxsdata,
proposals: proposals,
activeSets: activeSets,
edVerifier: edVerifier,
publisher: p,
fetcher: f,
mesh: m,
tortoise: tortoise,
pendingWeightCalc: make(map[types.Hash32][]chan uint64),
clock: clock,
}
for _, opt := range opts {
opt(b)
Expand Down Expand Up @@ -519,6 +523,87 @@ func (h *Handler) checkBallotSyntacticValidity(
return decoded, nil
}

func (h *Handler) getActiveSetWeight(ctx context.Context, id types.Hash32) (uint64, error) {
h.weightCalcLock.Lock()
totalWeight, exists := h.activeSets.Get(id)
if exists {
h.weightCalcLock.Unlock()
return totalWeight, nil
}

var ch chan uint64
chs, exists := h.pendingWeightCalc[id]
if exists {
// The calculation is running or the activeset is being fetched,
// subscribe.
// Avoid any blocking on the channel by making it buffered, also so that
// we don't have to wait on it in case the context is canceled
ch = make(chan uint64, 1)
h.pendingWeightCalc[id] = append(chs, ch)
h.weightCalcLock.Unlock()

// need to wait for the calculation which is already running to finish
select {
case <-ctx.Done():
return 0, ctx.Err()
case totalWeight, ok := <-ch:
if !ok {
// Channel closed, fetch / calculation failed.
// The actual error will be logged by the initiator of the
// initial fetch / calculation, let's not make an
// impression it happened multiple times and use a simpler
// message
return totalWeight, errors.New("error getting activeset weight")
}
return totalWeight, nil
}
}

// mark calculation as running
h.pendingWeightCalc[id] = nil
h.weightCalcLock.Unlock()

success := false
defer func() {
h.weightCalcLock.Lock()
// this is guaranteed not to block b/c each channel is buffered
for _, ch := range h.pendingWeightCalc[id] {
if success {
ch <- totalWeight
}
close(ch)
}
delete(h.pendingWeightCalc, id)
h.weightCalcLock.Unlock()
}()

if err := h.fetcher.GetActiveSet(ctx, id); err != nil {
return 0, err
}
set, err := activesets.Get(h.db, id)
if err != nil {
return 0, err
}
if len(set.Set) == 0 {
return 0, fmt.Errorf("%w: empty active set", pubsub.ErrValidationReject)
}

computed, used := h.atxsdata.WeightForSet(set.Epoch, set.Set)
for i := range used {
if !used[i] {
return 0, fmt.Errorf(
"missing atx %s in active set",
set.Set[i].ShortString(),
)
}
}
totalWeight = computed
h.activeSets.Add(id, totalWeight)
success = true // totalWeight will be sent to the subscribers

return totalWeight, nil
}

func (h *Handler) checkBallotDataIntegrity(ctx context.Context, b *types.Ballot) (uint64, error) {
//nolint:nestif
if b.RefBallot == types.EmptyBallotID {
Expand All @@ -534,36 +619,9 @@ func (h *Handler) checkBallotDataIntegrity(ctx context.Context, b *types.Ballot)
epoch-- // download activesets in the previous epoch too
}
if b.Layer.GetEpoch() >= epoch {
var exists bool
totalWeight, exists := h.activeSets.Get(b.EpochData.ActiveSetHash)
if !exists {
if err := h.fetcher.GetActiveSet(ctx, b.EpochData.ActiveSetHash); err != nil {
return 0, err
}
set, err := activesets.Get(h.db, b.EpochData.ActiveSetHash)
if err != nil {
return 0, err
}
if len(set.Set) == 0 {
return 0, fmt.Errorf(
"%w: empty active set ballot %s",
pubsub.ErrValidationReject,
b.ID().String(),
)
}

computed, used := h.atxsdata.WeightForSet(set.Epoch, set.Set)
for i := range used {
if !used[i] {
return 0, fmt.Errorf(
"missing atx %s in active set ballot %s",
set.Set[i].ShortString(),
b.ID().String(),
)
}
}
totalWeight = computed
h.activeSets.Add(b.EpochData.ActiveSetHash, totalWeight)
totalWeight, err := h.getActiveSetWeight(ctx, b.EpochData.ActiveSetHash)
if err != nil {
return 0, fmt.Errorf("ballot %s: %w", b.ID().String(), err)
}
return totalWeight, nil
}
Expand Down

0 comments on commit b57e131

Please sign in to comment.