Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: message pool: change locks to RWMutexes for performance #10561

Merged
merged 8 commits into from
Mar 30, 2023
18 changes: 9 additions & 9 deletions chain/messagepool/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ func (mp *MessagePool) CheckMessages(ctx context.Context, protos []*api.MessageP
// CheckPendingMessages performs a set of logical sets for all messages pending from a given actor
func (mp *MessagePool) CheckPendingMessages(ctx context.Context, from address.Address) ([][]api.MessageCheckStatus, error) {
var msgs []*types.Message
mp.lk.Lock()
snissn marked this conversation as resolved.
Show resolved Hide resolved
mp.lk.RLock()
mset, ok := mp.pending[from]
snissn marked this conversation as resolved.
Show resolved Hide resolved
if ok {
for _, sm := range mset.msgs {
snissn marked this conversation as resolved.
Show resolved Hide resolved
msgs = append(msgs, &sm.Message)
}
}
mp.lk.Unlock()
mp.lk.RUnlock()

if len(msgs) == 0 {
return nil, nil
Expand All @@ -58,7 +58,7 @@ func (mp *MessagePool) CheckReplaceMessages(ctx context.Context, replace []*type
msgMap := make(map[address.Address]map[uint64]*types.Message)
count := 0

mp.lk.Lock()
mp.lk.RLock()
for _, m := range replace {
mmap, ok := msgMap[m.From]
if !ok {
Expand All @@ -76,7 +76,7 @@ func (mp *MessagePool) CheckReplaceMessages(ctx context.Context, replace []*type
}
mmap[m.Nonce] = m
}
mp.lk.Unlock()
mp.lk.RUnlock()

msgs := make([]*types.Message, 0, count)
start := 0
Expand All @@ -103,9 +103,9 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message,
if mp.api.IsLite() {
return nil, nil
}
mp.curTsLk.Lock()
mp.curTsLk.RLock()
curTs := mp.curTs
mp.curTsLk.Unlock()
mp.curTsLk.RUnlock()

epoch := curTs.Height() + 1

Expand Down Expand Up @@ -143,22 +143,22 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message,

st, ok := state[m.From]
if !ok {
mp.lk.Lock()
mp.lk.RLock()
mset, ok := mp.pending[m.From]
if ok && !interned {
st = &actorState{nextNonce: mset.nextNonce, requiredFunds: mset.requiredFunds}
for _, m := range mset.msgs {
st.requiredFunds = new(stdbig.Int).Add(st.requiredFunds, m.Message.Value.Int)
}
state[m.From] = st
mp.lk.Unlock()
mp.lk.RUnlock()

check.OK = true
check.Hint = map[string]interface{}{
"nonce": st.nextNonce,
}
} else {
mp.lk.Unlock()
mp.lk.RUnlock()

stateNonce, err := mp.getStateNonce(ctx, m.From, curTs)
if err != nil {
Expand Down
71 changes: 47 additions & 24 deletions chain/messagepool/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func init() {
}

type MessagePool struct {
lk sync.Mutex
lk sync.RWMutex

ds dtypes.MetadataDS

Expand All @@ -139,7 +139,7 @@ type MessagePool struct {

keyCache map[address.Address]address.Address

curTsLk sync.Mutex // DO NOT LOCK INSIDE lk
curTsLk sync.RWMutex // DO NOT LOCK INSIDE lk
curTs *types.TipSet

cfgLk sync.RWMutex
Expand Down Expand Up @@ -763,7 +763,30 @@ func (mp *MessagePool) Add(ctx context.Context, m *types.SignedMessage) error {
<-mp.addSema
}()

mp.curTsLk.Lock()
//Ensure block calculation is cached without holding the write lock
mp.curTsLk.RLock()
tmpCurTs := mp.curTs
mp.curTsLk.RUnlock()
_, _ = mp.api.GetActorAfter(m.Message.From, tmpCurTs)
_, _ = mp.getStateNonce(ctx, m.Message.From, tmpCurTs)

cacheSecondTime := true
//if the newly acquired Ts is not the one we just cached, let go of the lock, cache it and open the lock again and repeat....
for cacheSecondTime {
mp.curTsLk.Lock()
snissn marked this conversation as resolved.
Show resolved Hide resolved
writeCurTs := mp.curTs

if writeCurTs == tmpCurTs {
break // we have this cached we can skip
}
mp.curTsLk.Unlock()
tmpCurTs = writeCurTs
_, _ = mp.api.GetActorAfter(m.Message.From, tmpCurTs)
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
_, _ = mp.getStateNonce(ctx, m.Message.From, tmpCurTs)
cacheSecondTime = false
snissn marked this conversation as resolved.
Show resolved Hide resolved

}

defer mp.curTsLk.Unlock()

_, err = mp.addTs(ctx, m, mp.curTs, false, false)
Expand Down Expand Up @@ -852,14 +875,14 @@ func (mp *MessagePool) addTs(ctx context.Context, m *types.SignedMessage, curTs
return false, xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow)
}

mp.lk.Lock()
defer mp.lk.Unlock()

senderAct, err := mp.api.GetActorAfter(m.Message.From, curTs)
if err != nil {
return false, xerrors.Errorf("failed to get sender actor: %w", err)
}

mp.lk.Lock()
snissn marked this conversation as resolved.
Show resolved Hide resolved
defer mp.lk.Unlock()

// This message can only be included in the _next_ epoch and beyond, hence the +1.
epoch := curTs.Height() + 1
nv := mp.api.StateNetworkVersion(ctx, epoch)
Expand Down Expand Up @@ -1001,19 +1024,19 @@ func (mp *MessagePool) addLocked(ctx context.Context, m *types.SignedMessage, st
}

func (mp *MessagePool) GetNonce(ctx context.Context, addr address.Address, _ types.TipSetKey) (uint64, error) {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
mp.curTsLk.RLock()
defer mp.curTsLk.RUnlock()

mp.lk.Lock()
defer mp.lk.Unlock()
mp.lk.RLock()
defer mp.lk.RUnlock()

return mp.getNonceLocked(ctx, addr, mp.curTs)
snissn marked this conversation as resolved.
Show resolved Hide resolved
}

// GetActor should not be used. It is only here to satisfy interface mess caused by lite node handling
func (mp *MessagePool) GetActor(_ context.Context, addr address.Address, _ types.TipSetKey) (*types.Actor, error) {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
mp.curTsLk.RLock()
defer mp.curTsLk.RUnlock()
return mp.api.GetActorAfter(addr, mp.curTs)
}

Expand Down Expand Up @@ -1164,11 +1187,11 @@ func (mp *MessagePool) remove(ctx context.Context, from address.Address, nonce u
}

func (mp *MessagePool) Pending(ctx context.Context) ([]*types.SignedMessage, *types.TipSet) {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
mp.curTsLk.RLock()
defer mp.curTsLk.RUnlock()

mp.lk.Lock()
defer mp.lk.Unlock()
mp.lk.RLock()
defer mp.lk.RUnlock()

return mp.allPending(ctx)
}
Expand All @@ -1184,11 +1207,11 @@ func (mp *MessagePool) allPending(ctx context.Context) ([]*types.SignedMessage,
}

func (mp *MessagePool) PendingFor(ctx context.Context, a address.Address) ([]*types.SignedMessage, *types.TipSet) {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
mp.curTsLk.RLock()
defer mp.curTsLk.RUnlock()

mp.lk.Lock()
defer mp.lk.Unlock()
mp.lk.RLock()
defer mp.lk.RUnlock()
return mp.pendingFor(ctx, a), mp.curTs
snissn marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down Expand Up @@ -1237,9 +1260,9 @@ func (mp *MessagePool) HeadChange(ctx context.Context, revert []*types.TipSet, a

maybeRepub := func(cid cid.Cid) {
if !repubTrigger {
mp.lk.Lock()
mp.lk.RLock()
_, republished := mp.republished[cid]
mp.lk.Unlock()
mp.lk.RUnlock()
if republished {
repubTrigger = true
}
Expand Down Expand Up @@ -1310,9 +1333,9 @@ func (mp *MessagePool) HeadChange(ctx context.Context, revert []*types.TipSet, a
}

if len(revert) > 0 && futureDebug {
mp.lk.Lock()
mp.lk.RLock()
msgs, ts := mp.allPending(ctx)
mp.lk.Unlock()
mp.lk.RUnlock()

buckets := map[address.Address]*statBucket{}

Expand Down
5 changes: 3 additions & 2 deletions chain/messagepool/repub.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ const repubMsgLimit = 30
var RepublishBatchDelay = 100 * time.Millisecond

func (mp *MessagePool) republishPendingMessages(ctx context.Context) error {
mp.curTsLk.Lock()
mp.curTsLk.RLock()
ts := mp.curTs

baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
mp.curTsLk.RUnlock()
snissn marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
mp.curTsLk.Unlock()
return xerrors.Errorf("computing basefee: %w", err)
}
baseFeeLowerBound := getBaseFeeLowerBound(baseFee, baseFeeLowerBoundFactor)

pending := make(map[address.Address]map[uint64]*types.SignedMessage)
mp.curTsLk.Lock()
snissn marked this conversation as resolved.
Show resolved Hide resolved
mp.lk.Lock()
mp.republished = nil // clear this to avoid races triggering an early republish
snissn marked this conversation as resolved.
Show resolved Hide resolved
mp.forEachLocal(ctx, func(ctx context.Context, actor address.Address) {
Expand Down
1 change: 1 addition & 0 deletions chain/messagepool/selection.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()

//TODO confirm if we can switch to RLock here for performance
snissn marked this conversation as resolved.
Show resolved Hide resolved
mp.lk.Lock()
defer mp.lk.Unlock()

Expand Down