Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Fix activeset weight calc performance in the proposal handler #5923

Closed
wants to merge 4 commits into from

Conversation

ivan4th
Copy link
Contributor

@ivan4th ivan4th commented May 9, 2024

Motivation

Proposal handler may cause high CPU and memory usage.

Description

As per the suggestion in #5765, the first time an active set is encountered when processing a ballot, it starts to be processed (fetch / store / calculate weight), and the concurrent attempts to process the same active set subscribe to the results.

Fixes #5765

Test Plan

Verified on a mainnet node.

TODO

  • Explain motivation or link existing issue(s)
  • Test changes and document test plan
  • Update changelog as needed

This fixes multiple concurrent retrievals of the same activeset from
the database that were causing high CPU and memory usage.

Fixes #5765
@ivan4th ivan4th force-pushed the fix/ballot-activeset-handling branch from ec063da to 4cc8520 Compare May 9, 2024 18:10
Copy link

codecov bot commented May 9, 2024

Codecov Report

Attention: Patch coverage is 89.23077% with 7 lines in your changes are missing coverage. Please review.

Project coverage is 80.7%. Comparing base (06ec6d0) to head (23c29b4).
Report is 2 commits behind head on develop.

❗ Current head 23c29b4 differs from pull request most recent head 7b2e6d2. Consider uploading reports for the commit 7b2e6d2 to get more accurate results

Files Patch % Lines
proposals/handler.go 89.2% 5 Missing and 2 partials ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##           develop   #5923   +/-   ##
=======================================
  Coverage     80.7%   80.7%           
=======================================
  Files          289     288    -1     
  Lines        29868   29857   -11     
=======================================
+ Hits         24107   24117   +10     
+ Misses        4164    4152   -12     
+ Partials      1597    1588    -9     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

proposals/handler.go Outdated Show resolved Hide resolved
proposals/handler.go Show resolved Hide resolved
proposals/handler_test.go Outdated Show resolved Hide resolved
Comment on lines 1486 to 1493
asCh <- asReq{id: sets[0].Hash()}
th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[0]))
err = th.HandleSyncedProposal(context.Background(), p[0].ID().AsHash32(), pid, codec.MustEncode(p[0]))
require.NoError(t, err)

th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[1]))
err = th.HandleSyncedProposal(context.Background(), p[1].ID().AsHash32(), pid, codec.MustEncode(p[1]))
require.NoError(t, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about instead of pre-generating lists of sets and proposals and using p[0], p[1], and so on in respective tests, create only the required set and proposals in the test body?

Also, the EXPECT().GetActiveSet could be much simpler and explicit (expect to be called an exact number of times) for this test. Currently, it's convoluted and hard to tell whether fetch.GetActiveSet() is expected to be called once or multiple times (it's also not checked by the test).

Consider something like:

Suggested change
asCh <- asReq{id: sets[0].Hash()}
th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[0]))
err = th.HandleSyncedProposal(context.Background(), p[0].ID().AsHash32(), pid, codec.MustEncode(p[0]))
require.NoError(t, err)
th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[1]))
err = th.HandleSyncedProposal(context.Background(), p[1].ID().AsHash32(), pid, codec.MustEncode(p[1]))
require.NoError(t, err)
set := types.ATXIDList{{1}, {2}}
var p []types.Proposal
for _, atx := range set {
proposals = append(proposals, gproposal(t, signer, set[0], lid, &types.EpochData{
ActiveSetHash: set.Hash(),
Beacon: types.Beacon{1},
}))
th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[0]))
// NOTE: the tests expects it to be called only once (no `AnyTimes()` anymore)
th.mf.EXPECT().GetActiveSet(gomock.Any(), set.Hash()).DoAndReturn(
func(ctx context.Context, got types.Hash32) error {
require.NoError(t, activesets.Add(th.db, got, &types.EpochActiveSet{
Epoch: lid.GetEpoch(),
Set: set,
}))
for _, id := range set {
th.atxsdata.AddAtx(lid.GetEpoch(), id, &atxsdata.ATX{Node: types.NodeID{1}})
}
return nil
})
err = th.HandleSyncedProposal(context.Background(), p[0].ID().AsHash32(), pid, codec.MustEncode(p[0]))
require.NoError(t, err)
th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[1]))
err = th.HandleSyncedProposal(context.Background(), p[1].ID().AsHash32(), pid, codec.MustEncode(p[1]))
require.NoError(t, err)
// maybe also something like:
require.True(t, th.mockController.Satisfied())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of GetActiveSet mock handler is to be able to pause fetching so that multiple request processing for the same activeset can be checked. For the simpler one-request-after-another case, a simpler impl can be used, but the more complicated one will still have to be kept, so that it can be used for the concurrent cases. And the proposals are already pre-generated, just earlier in the outer test

Copy link
Contributor Author

@ivan4th ivan4th May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without asCh <- asReq{id: sets[0].Hash()}, GetActiveSet doesn't go ahead, but I'll try to ensure it's called only once (although th.HandleSyncedProposal would block if it was called 2nd time)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no concurrent calls to HandleSyncedProposal in this test case. I'm proposing not to try a generic approach that would fit all test cases, but create simple and explicit expectations per test case. There is more code, but the tests are more readable and assert more i.e how many times an expectation is called. IMHO, .AnyTimes() should be avoided when possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored the test by adding a fixture to make it more readable, hopefully, so that the intent of each test case is not lost among EXPECT clutter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the tests check that GetActiveSet is called just once in each case except refetch (failed fetch / cancelation and then another handler call)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the intent of each test case is not lost among EXPECT clutter

The EXPECTs show the intent of the tests (what is expected to happen) ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I mean repeated large EXPECT constructs sometimes obscure the intent b/c of "just too much text". That needs some balance :)

Comment on lines 560 to 564
} else {
// mark calculation as running
h.pendingWeightCalc[id] = nil
h.weightCalcLock.Unlock()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the else is unnecessary because the code under if always returns.

Suggested change
} else {
// mark calculation as running
h.pendingWeightCalc[id] = nil
h.weightCalcLock.Unlock()
}
}
// mark calculation as running
h.pendingWeightCalc[id] = nil
h.weightCalcLock.Unlock()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're correct, sorry
Fixed

Comment on lines 1486 to 1493
asCh <- asReq{id: sets[0].Hash()}
th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[0]))
err = th.HandleSyncedProposal(context.Background(), p[0].ID().AsHash32(), pid, codec.MustEncode(p[0]))
require.NoError(t, err)

th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[1]))
err = th.HandleSyncedProposal(context.Background(), p[1].ID().AsHash32(), pid, codec.MustEncode(p[1]))
require.NoError(t, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the intent of each test case is not lost among EXPECT clutter

The EXPECTs show the intent of the tests (what is expected to happen) ;)

@ivan4th
Copy link
Contributor Author

ivan4th commented May 10, 2024

bors merge

spacemesh-bors bot pushed a commit that referenced this pull request May 10, 2024
## Motivation

Proposal handler may cause high CPU and memory usage.
@spacemesh-bors
Copy link

Pull request successfully merged into develop.

Build succeeded:

@spacemesh-bors spacemesh-bors bot changed the title Fix activeset weight calc performance in the proposal handler [Merged by Bors] - Fix activeset weight calc performance in the proposal handler May 10, 2024
@spacemesh-bors spacemesh-bors bot closed this May 10, 2024
@spacemesh-bors spacemesh-bors bot deleted the fix/ballot-activeset-handling branch May 10, 2024 14:45
Comment on lines +526 to +605
func (h *Handler) getActiveSetWeight(ctx context.Context, id types.Hash32) (uint64, error) {
h.weightCalcLock.Lock()
totalWeight, exists := h.activeSets.Get(id)
if exists {
h.weightCalcLock.Unlock()
return totalWeight, nil
}

var ch chan uint64
chs, exists := h.pendingWeightCalc[id]
if exists {
// The calculation is running or the activeset is being fetched,
// subscribe.
// Avoid any blocking on the channel by making it buffered, also so that
// we don't have to wait on it in case the context is canceled
ch = make(chan uint64, 1)
h.pendingWeightCalc[id] = append(chs, ch)
h.weightCalcLock.Unlock()

// need to wait for the calculation which is already running to finish
select {
case <-ctx.Done():
return 0, ctx.Err()
case totalWeight, ok := <-ch:
if !ok {
// Channel closed, fetch / calculation failed.
// The actual error will be logged by the initiator of the
// initial fetch / calculation, let's not make an
// impression it happened multiple times and use a simpler
// message
return totalWeight, errors.New("error getting activeset weight")
}
return totalWeight, nil
}
}

// mark calculation as running
h.pendingWeightCalc[id] = nil
h.weightCalcLock.Unlock()

success := false
defer func() {
h.weightCalcLock.Lock()
// this is guaranteed not to block b/c each channel is buffered
for _, ch := range h.pendingWeightCalc[id] {
if success {
ch <- totalWeight
}
close(ch)
}
delete(h.pendingWeightCalc, id)
h.weightCalcLock.Unlock()
}()

if err := h.fetcher.GetActiveSet(ctx, id); err != nil {
return 0, err
}
set, err := activesets.Get(h.db, id)
if err != nil {
return 0, err
}
if len(set.Set) == 0 {
return 0, fmt.Errorf("%w: empty active set", pubsub.ErrValidationReject)
}

computed, used := h.atxsdata.WeightForSet(set.Epoch, set.Set)
for i := range used {
if !used[i] {
return 0, fmt.Errorf(
"missing atx %s in active set",
set.Set[i].ShortString(),
)
}
}
totalWeight = computed
h.activeSets.Add(id, totalWeight)
success = true // totalWeight will be sent to the subscribers

return totalWeight, nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit late to the discussion but I feel like this could have been done simpler using the golang.org/x/sync/singleflight package:

func (h *Handler) getActiveSetWeight(ctx context.Context, id types.Hash32) (uint64, error) {
	result, err, _ := h.activeSetGroup.Do(id.String(), func() (any, error) {
		totalWeight, exists := h.activeSets.Get(id)
		if exists {
			return totalWeight, nil
		}

		if err := h.fetcher.GetActiveSet(ctx, id); err != nil {
			return uint64(0), err
		}
		set, err := activesets.Get(h.db, id)
		if err != nil {
			return uint64(0), err
		}
		if len(set.Set) == 0 {
			return uint64(0), fmt.Errorf("%w: empty active set", pubsub.ErrValidationReject)
		}

		computed, used := h.atxsdata.WeightForSet(set.Epoch, set.Set)
		for i := range used {
			if !used[i] {
				return uint64(0), fmt.Errorf("missing atx %s in active set", set.Set[i])
			}
		}
		h.activeSets.Add(id, computed)
		return computed, nil
	})
	if err != nil {
		h.activeSetGroup.Forget(id.String())
	}
	return result.(uint64), err
}

No mutex or channels needed 🙂

ivan4th added a commit that referenced this pull request May 13, 2024
Proposal handler may cause high CPU and memory usage.
Backport of #5923.
fasmat added a commit that referenced this pull request May 13, 2024
…ler (#5923) (#5931)

* Fix activeset weight calc performance in the proposal handler

Proposal handler may cause high CPU and memory usage.
Backport of #5923.

* Fix typo

---------

Co-authored-by: Matthias <5011972+fasmat@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

avoid calls to WeightForSet for the same activeset
3 participants