Skip to content

Commit

Permalink
fix: correctly count the swingset messages in a cosmos message
Browse files Browse the repository at this point in the history
  • Loading branch information
JimLarson committed Mar 8, 2023
1 parent f941b26 commit daa16b1
Show file tree
Hide file tree
Showing 2 changed files with 281 additions and 60 deletions.
153 changes: 93 additions & 60 deletions golang/cosmos/ante/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,83 +10,116 @@ import (
swingtypes "github.com/Agoric/agoric-sdk/golang/cosmos/x/swingset/types"
)

// This AnteDecorator enforces a limit on the size of the Swingset inbound
// queue by scanning for Cosmos-messages which add Swingset-messages to
// that queue. Note that when running DeliverTx, inbound messages are
// staged in the action queue, then transferred to the inbound queue in
// end-block processing. Previous Txs in the block will have already been
// added to the action queue, so we must reject Txs which would grouw the
// action queue beyond the allowed inbound size.

// We would like to reject messages during mempool admission (CheckTx)
// rather than during block execution (DeliverTx), but at CheckTx time
// we don't know how many messages will be allowed at DeliverTx time,
// nor the size of the action queue from preceding Txs in the block.
// To mitigate this, Swingset should implement hysteresis by computing
// the number of messages allowed for mempool admission as if its max queue
// length was lower (e.g. 50%). This is the QueueInboundMempool entry in the
// Swingset state QueueAllowed field. At DeliverTx time the QueueInbound
// entry gives the number of allowed messages.

const (
maxInboundPerTx = 1
)

// TODO: We don't have a more appropriate error type for this.
var ErrInboundQueueFull = sdkerrors.ErrMempoolIsFull

// inboundAnte is an sdk.AnteDecorator which enforces the allowed size of the inbound queue.
type inboundAnte struct {
sk SwingsetKeeper
}

// NewInboundDecorator return an AnteDecorator which honors the allowed size of the inbound queue.
// The swingset message types will consume one allowed entry each, and will be reflected
// in the action queue when executed.
// NewInboundDecorator returns an AnteDecorator which honors the allowed size of the inbound queue.
func NewInboundDecorator(sk SwingsetKeeper) sdk.AnteDecorator {
return inboundAnte{sk: sk}
}

// AnteHandle implements sdk.AnteDecorator
// AnteHandle implements sdk.AnteDecorator.
// Lazily consults the Swingset state to avoid overhead when dealing
// with pure Cosmos-level Txs.
func (ia inboundAnte) AnteHandle(ctx sdk.Context, tx sdk.Tx, simulate bool, next sdk.AnteHandler) (sdk.Context, error) {
msgs := tx.GetMsgs()
inboundsAllowed := int32(-1)
for _, msg := range msgs {
switch msg.(type) {
case *swingtypes.MsgDeliverInbound,
*swingtypes.MsgInstallBundle,
*swingtypes.MsgProvision,
*swingtypes.MsgWalletAction,
*swingtypes.MsgWalletSpendAction:
// Lazily compute the number of allowed messages when the transaction
// includes a SwingSet message. This number is the difference between
// the number of allowed messages initially returned by SwingSet, and
// the number of messages already added into the actionQueue by other
// transactions included in the block.
// We store the number of allowed messages locally since messages are
// added to the actionQueue after going through the ante handler (in
// CheckTx) and before the next transaction is processed.
// However in CheckTx (mempool admission check), no state is
// changed so it's possible for a set of transactions to exist which
// if all included in a block would push the actionQueue over, and thus
// end up in rejections instead of simply not admitting them in the
// mempool. To mitigate this, Swingset should compute the number of
// messages allowed for mempool admission as if its max queue length
// was lower (e.g. 50%). This is the QueueInboundMempool entry.
if inboundsAllowed == -1 {
state := ia.sk.GetState(ctx)
entry := swingtypes.QueueInbound
if ctx.IsCheckTx() {
entry = swingtypes.QueueInboundMempool
}
allowed, found := swingtypes.QueueSizeEntry(state.QueueAllowed, entry)
if found {
actions, err := ia.sk.ActionQueueLength(ctx)
if err != nil {
return ctx, err
}
if actions < allowed {
inboundsAllowed = allowed - actions
} else {
inboundsAllowed = 0
}
} else {
// if number of allowed entries not given, fail closed
inboundsAllowed = 0
}
}
if inboundsAllowed > 0 {
inboundsAllowed--
} else {
defer func() {
telemetry.IncrCounterWithLabels(
[]string{"tx", "ante", "inbound_not_allowed"},
1,
[]metrics.Label{
telemetry.NewLabel("msg", sdk.MsgTypeURL(msg)),
},
)
}()
return ctx, ErrInboundQueueFull
inbounds := inboundMessages(msg)
if inbounds == 0 {
continue
}
if inboundsAllowed == -1 {
var err error
inboundsAllowed, err = ia.allowedInbound(ctx)
if err != nil {
return ctx, err
}
}
if inboundsAllowed >= inbounds {
inboundsAllowed -= inbounds
} else {
defer func() {
telemetry.IncrCounterWithLabels(
[]string{"tx", "ante", "inbound_not_allowed"},
1,
[]metrics.Label{
telemetry.NewLabel("msg", sdk.MsgTypeURL(msg)),
},
)
}()
return ctx, ErrInboundQueueFull
}
}
return next(ctx, tx, simulate)
}

// allowedInbound returns the allowed number of inbound queue messages or an error.
// Look up the limit from the swingset state queue sizes: from QueueInboundMempool
// if we're running CheckTx (for the hysteresis described above), otherwise QueueAllowed.
func (ia inboundAnte) allowedInbound(ctx sdk.Context) (int32, error) {
state := ia.sk.GetState(ctx)
entry := swingtypes.QueueInbound
if ctx.IsCheckTx() {
entry = swingtypes.QueueInboundMempool
}
allowed, found := swingtypes.QueueSizeEntry(state.QueueAllowed, entry)
if !found {
// if number of allowed entries not given, fail closed
return 0, nil
}
actions, err := ia.sk.ActionQueueLength(ctx)
if err != nil {
return 0, err
}
if actions >= allowed {
return 0, nil
}
allowed -= actions
if allowed > maxInboundPerTx {
return maxInboundPerTx, nil
}
return allowed, nil
}

// inboundMessages returns the nunber of inbound queue messages in msg.
func inboundMessages(msg sdk.Msg) int32 {
switch m := msg.(type) {
case *swingtypes.MsgDeliverInbound:
return int32(len(m.Messages))

case *swingtypes.MsgInstallBundle,
*swingtypes.MsgProvision,
*swingtypes.MsgWalletAction,
*swingtypes.MsgWalletSpendAction:
return 1
}
return 0
}
188 changes: 188 additions & 0 deletions golang/cosmos/ante/inbound_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package ante

import (
"fmt"
"reflect"
"testing"

swingtypes "github.com/Agoric/agoric-sdk/golang/cosmos/x/swingset/types"
"github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/tx"
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
"github.com/gogo/protobuf/proto"
)

func TestInboundAnteHandle(t *testing.T) {
for _, tt := range []struct {
name string
checkTx bool
tx sdk.Tx
simulate bool
actionQueueLength int32
actionQueueLengthErr error
inboundLimit int32
mempoolLimit int32
errMsg string
}{
{
name: "empty-empty",
tx: makeTestTx(),
},
{
name: "reject-on-zero-allowed",
tx: makeTestTx(&swingtypes.MsgInstallBundle{}),
errMsg: ErrInboundQueueFull.Error(),
},
{
name: "has-room",
tx: makeTestTx(&swingtypes.MsgWalletAction{}),
inboundLimit: 10,
},
{
name: "no-room",
tx: makeTestTx(&swingtypes.MsgWalletAction{}, &swingtypes.MsgInstallBundle{}),
inboundLimit: 10,
actionQueueLength: 9,
errMsg: ErrInboundQueueFull.Error(),
},
{
name: "state-lookup-error",
tx: makeTestTx(&swingtypes.MsgWalletAction{}, &swingtypes.MsgInstallBundle{}),
inboundLimit: 10,
actionQueueLengthErr: fmt.Errorf("sunspots"),
errMsg: "sunspots",
},
{
name: "allow-non-swingset-msgs",
tx: makeTestTx(&banktypes.MsgSend{}, &banktypes.MsgSend{}),
inboundLimit: 1,
},
{
name: "lazy-queue-length-lookup",
tx: makeTestTx(&banktypes.MsgSend{}, &banktypes.MsgSend{}),
inboundLimit: 1,
actionQueueLengthErr: fmt.Errorf("sunspots"),
},
{
name: "checktx",
checkTx: true,
tx: makeTestTx(&swingtypes.MsgWalletAction{}),
inboundLimit: 10,
mempoolLimit: 5,
actionQueueLength: 7,
errMsg: ErrInboundQueueFull.Error(),
},
{
name: "empty-queue-allowed",
tx: makeTestTx(&swingtypes.MsgWalletAction{}),
inboundLimit: -1,
errMsg: ErrInboundQueueFull.Error(),
},
{
name: "already-full",
tx: makeTestTx(&swingtypes.MsgWalletAction{}),
inboundLimit: 10,
actionQueueLength: 10,
errMsg: ErrInboundQueueFull.Error(),
},
{
name: "deliver-inbound-empty",
tx: makeTestTx(&swingtypes.MsgDeliverInbound{}),
inboundLimit: 10,
actionQueueLength: 20,
},
{
name: "deliver-inbound-one",
tx: makeTestTx(&swingtypes.MsgDeliverInbound{
Messages: []string{"foo"},
}),
inboundLimit: 2,
actionQueueLength: 1,
},
{
name: "deliver-inbound-one",
tx: makeTestTx(&swingtypes.MsgDeliverInbound{
Messages: []string{"foo", "bar"},
}),
inboundLimit: 2,
actionQueueLength: 1,
errMsg: ErrInboundQueueFull.Error(),
},
} {
t.Run(tt.name, func(t *testing.T) {
ctx := sdk.Context{}.WithIsCheckTx(tt.checkTx)
emptyQueueAllowed := false
if tt.inboundLimit == -1 {
emptyQueueAllowed = true
}
mock := mockSwingsetKeeper{
actionQueueLength: tt.actionQueueLength,
actionQueueLengthErr: tt.actionQueueLengthErr,
inboundLimit: tt.inboundLimit,
mempoolLimit: tt.mempoolLimit,
emptyQueueAllowed: emptyQueueAllowed,
}
decorator := NewInboundDecorator(mock)
newCtx, err := decorator.AnteHandle(ctx, tt.tx, tt.simulate, nilAnteHandler)
if !reflect.DeepEqual(newCtx, ctx) {
t.Errorf("want ctx %v, got %v", ctx, newCtx)
}
if err != nil {
if tt.errMsg == "" {
t.Errorf("want no error, got %s", err.Error())
} else if err.Error() != tt.errMsg {
t.Errorf("want error %s, got %s", tt.errMsg, err.Error())
}
} else if tt.errMsg != "" {
t.Errorf("want error %s, got none", tt.errMsg)
}
})
}
}

func makeTestTx(msgs ...proto.Message) sdk.Tx {
wrappedMsgs := make([]*types.Any, len(msgs))
for i, m := range msgs {
any, err := types.NewAnyWithValue(m)
if err != nil {
panic(err)
}
wrappedMsgs[i] = any
}
return &tx.Tx{
Body: &tx.TxBody{
Messages: wrappedMsgs,
},
}
}

func nilAnteHandler(ctx sdk.Context, tx sdk.Tx, simulate bool) (newCtx sdk.Context, err error) {
return ctx, nil
}

type mockSwingsetKeeper struct {
actionQueueLength int32
actionQueueLengthErr error
inboundLimit int32
mempoolLimit int32
emptyQueueAllowed bool
}

var _ SwingsetKeeper = mockSwingsetKeeper{}

func (msk mockSwingsetKeeper) ActionQueueLength(ctx sdk.Context) (int32, error) {
return msk.actionQueueLength, msk.actionQueueLengthErr
}

func (msk mockSwingsetKeeper) GetState(ctx sdk.Context) swingtypes.State {
if msk.emptyQueueAllowed {
return swingtypes.State{}
}
return swingtypes.State{
QueueAllowed: []swingtypes.QueueSize{
swingtypes.NewQueueSize(swingtypes.QueueInbound, msk.inboundLimit),
swingtypes.NewQueueSize(swingtypes.QueueInboundMempool, msk.mempoolLimit),
},
}
}

0 comments on commit daa16b1

Please sign in to comment.