Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MpoolPushUntrusted API for gateway #3915

Merged
merged 4 commits into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ type FullNode interface {
// MpoolPush pushes a signed message to mempool.
MpoolPush(context.Context, *types.SignedMessage) (cid.Cid, error)

// MpoolPushUntrusted pushes a signed message to mempool from untrusted sources.
MpoolPushUntrusted(context.Context, *types.SignedMessage) (cid.Cid, error)

// MpoolPushMessage atomically assigns a nonce, signs, and pushes a message
// to mempool.
// maxFee is only used when GasFeeCap/GasPremium fields aren't specified
Expand Down
8 changes: 7 additions & 1 deletion api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ type FullNodeStruct struct {
MpoolPending func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"`
MpoolClear func(context.Context, bool) error `perm:"write"`

MpoolPush func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"`
MpoolPush func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"`
MpoolPushUntrusted func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"`

MpoolPushMessage func(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error) `perm:"sign"`
MpoolGetNonce func(context.Context, address.Address) (uint64, error) `perm:"read"`
MpoolSub func(context.Context) (<-chan api.MpoolUpdate, error) `perm:"read"`
Expand Down Expand Up @@ -549,6 +551,10 @@ func (c *FullNodeStruct) MpoolPush(ctx context.Context, smsg *types.SignedMessag
return c.Internal.MpoolPush(ctx, smsg)
}

func (c *FullNodeStruct) MpoolPushUntrusted(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
return c.Internal.MpoolPushUntrusted(ctx, smsg)
}

func (c *FullNodeStruct) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
return c.Internal.MpoolPushMessage(ctx, msg, spec)
}
Expand Down
77 changes: 65 additions & 12 deletions chain/messagepool/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var baseFeeLowerBoundFactor = types.NewInt(10)
var baseFeeLowerBoundFactorConservative = types.NewInt(100)

var MaxActorPendingMessages = 1000
var MaxUntrustedActorPendingMessages = 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Do we use that for messages coming from the messages pubsub?
  • If yes, that's likely way too low, many bigger miners have many more messages that that per tipset

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, we only use it for the gateway -- messages coming from pubsub use Add.


var MaxNonceGap = uint64(4)

Expand Down Expand Up @@ -197,9 +198,17 @@ func CapGasFee(msg *types.Message, maxFee abi.TokenAmount) {
msg.GasPremium = big.Min(msg.GasFeeCap, msg.GasPremium) // cap premium at FeeCap
}

func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict bool) (bool, error) {
func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict, untrusted bool) (bool, error) {
nextNonce := ms.nextNonce
nonceGap := false

maxNonceGap := MaxNonceGap
maxActorPendingMessages := MaxActorPendingMessages
if untrusted {
maxNonceGap = 0
maxActorPendingMessages = MaxUntrustedActorPendingMessages
}

switch {
case m.Message.Nonce == nextNonce:
nextNonce++
Expand All @@ -208,7 +217,7 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict bool) (boo
nextNonce++
}

case strict && m.Message.Nonce > nextNonce+MaxNonceGap:
case strict && m.Message.Nonce > nextNonce+maxNonceGap:
return false, xerrors.Errorf("message nonce has too big a gap from expected nonce (Nonce: %d, nextNonce: %d): %w", m.Message.Nonce, nextNonce, ErrNonceGap)

case m.Message.Nonce > nextNonce:
Expand Down Expand Up @@ -244,7 +253,7 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict bool) (boo
//ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.Value.Int)
}

if !has && strict && len(ms.msgs) > MaxActorPendingMessages {
if !has && strict && len(ms.msgs) > maxActorPendingMessages {
magik6k marked this conversation as resolved.
Show resolved Hide resolved
log.Errorf("too many pending messages from actor %s", m.Message.From)
return false, ErrTooManyPendingMessages
}
Expand Down Expand Up @@ -486,7 +495,7 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
}

mp.curTsLk.Lock()
publish, err := mp.addTs(m, mp.curTs, true)
publish, err := mp.addTs(m, mp.curTs, true, false)
if err != nil {
mp.curTsLk.Unlock()
return cid.Undef, err
Expand Down Expand Up @@ -553,7 +562,7 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()

_, err = mp.addTs(m, mp.curTs, false)
_, err = mp.addTs(m, mp.curTs, false, false)
return err
}

Expand Down Expand Up @@ -621,7 +630,7 @@ func (mp *MessagePool) checkBalance(m *types.SignedMessage, curTs *types.TipSet)
return nil
}

func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local bool) (bool, error) {
func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local, untrusted bool) (bool, error) {
snonce, err := mp.getStateNonce(m.Message.From, curTs)
if err != nil {
return false, xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
Expand All @@ -643,7 +652,7 @@ func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local
return false, err
}

return publish, mp.addLocked(m, !local)
return publish, mp.addLocked(m, !local, untrusted)
}

func (mp *MessagePool) addLoaded(m *types.SignedMessage) error {
Expand Down Expand Up @@ -678,17 +687,17 @@ func (mp *MessagePool) addLoaded(m *types.SignedMessage) error {
return err
}

return mp.addLocked(m, false)
return mp.addLocked(m, false, false)
}

func (mp *MessagePool) addSkipChecks(m *types.SignedMessage) error {
mp.lk.Lock()
defer mp.lk.Unlock()

return mp.addLocked(m, false)
return mp.addLocked(m, false, false)
}

func (mp *MessagePool) addLocked(m *types.SignedMessage, strict bool) error {
func (mp *MessagePool) addLocked(m *types.SignedMessage, strict, untrusted bool) error {
log.Debugf("mpooladd: %s %d", m.Message.From, m.Message.Nonce)
if m.Signature.Type == crypto.SigTypeBLS {
mp.blsSigCache.Add(m.Cid(), m.Signature)
Expand All @@ -715,7 +724,7 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, strict bool) error {
mp.pending[m.Message.From] = mset
}

incr, err := mset.add(m, mp, strict)
incr, err := mset.add(m, mp, strict, untrusted)
if err != nil {
log.Debug(err)
return err
Expand Down Expand Up @@ -873,7 +882,7 @@ func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address,
return nil, err
}

if err := mp.addLocked(msg, false); err != nil {
if err := mp.addLocked(msg, false, false); err != nil {
return nil, xerrors.Errorf("add locked failed: %w", err)
}
if err := mp.addLocal(msg, msgb); err != nil {
Expand All @@ -887,6 +896,50 @@ func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address,
return msg, err
}

// this method is provided for the gateway to push messages.
// differences from Push:
// - strict checks are enabled
// - extra strict add checks are used when adding the messages to the msgSet
// that means: no nonce gaps, at most 10 pending messages for the actor
func (mp *MessagePool) PushUntrusted(m *types.SignedMessage) (cid.Cid, error) {
err := mp.checkMessage(m)
if err != nil {
return cid.Undef, err
}

// serialize push access to reduce lock contention
mp.addSema <- struct{}{}
defer func() {
<-mp.addSema
}()

msgb, err := m.Serialize()
if err != nil {
return cid.Undef, err
}

mp.curTsLk.Lock()
publish, err := mp.addTs(m, mp.curTs, false, true)
if err != nil {
mp.curTsLk.Unlock()
return cid.Undef, err
}
mp.curTsLk.Unlock()

mp.lk.Lock()
if err := mp.addLocal(m, msgb); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't still want to do addLocal do we?

Copy link
Contributor Author

@vyzo vyzo Sep 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I am ambivalent about this; if we don't the messages will be transient, ie they might disappear in the next prune.
We can certainly remove the addLocal incantation.

mp.lk.Unlock()
return cid.Undef, err
}
mp.lk.Unlock()

if publish {
err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
}

return m.Cid(), err
}

func (mp *MessagePool) Remove(from address.Address, nonce uint64, applied bool) {
mp.lk.Lock()
defer mp.lk.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,5 @@ func (a *GatewayAPI) MpoolPush(ctx context.Context, sm *types.SignedMessage) (ci

// TODO: additional anti-spam checks

return a.api.MpoolPush(ctx, sm)
return a.api.MpoolPushUntrusted(ctx, sm)
}
4 changes: 4 additions & 0 deletions node/impl/full/mpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (ci
return a.Mpool.Push(smsg)
}

func (a *MpoolAPI) MpoolPushUntrusted(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
return a.Mpool.PushUntrusted(smsg)
}

func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
inMsg := *msg
{
Expand Down