Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Do Not Merge] Async Ack #1876

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
11 changes: 11 additions & 0 deletions docs/proto/proto-docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
- [MsgIBCCloseChannel](#cosmwasm.wasm.v1.MsgIBCCloseChannel)
- [MsgIBCSend](#cosmwasm.wasm.v1.MsgIBCSend)
- [MsgIBCSendResponse](#cosmwasm.wasm.v1.MsgIBCSendResponse)
- [MsgIBCWriteAcknowledgementResponse](#cosmwasm.wasm.v1.MsgIBCWriteAcknowledgementResponse)

- [cosmwasm/wasm/v1/proposal_legacy.proto](#cosmwasm/wasm/v1/proposal_legacy.proto)
- [AccessConfigUpdate](#cosmwasm.wasm.v1.AccessConfigUpdate)
Expand Down Expand Up @@ -641,6 +642,16 @@ MsgIBCSendResponse




<a name="cosmwasm.wasm.v1.MsgIBCWriteAcknowledgementResponse"></a>

### MsgIBCWriteAcknowledgementResponse
MsgIBCWriteAcknowledgementResponse





<!-- end messages -->

<!-- end enums -->
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ replace (
github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
)

// TODO: remove this once we have a release
replace github.com/CosmWasm/wasmvm/v2 => /Users/christoph/Projects/wasmvm

retract (
// see https://github.com/CosmWasm/wasmd/issues/1713
v0.44.0
Expand Down
3 changes: 3 additions & 0 deletions proto/cosmwasm/wasm/v1/ibc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ message MsgIBCSendResponse {
uint64 sequence = 1;
}

// MsgIBCWriteAcknowledgementResponse
message MsgIBCWriteAcknowledgementResponse {}

// MsgIBCCloseChannel port and channel need to be owned by the contract
message MsgIBCCloseChannel {
string channel = 2 [ (gogoproto.moretags) = "yaml:\"source_channel\"" ];
Expand Down
1 change: 0 additions & 1 deletion x/wasm/client/cli/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ func parseStoreCodeArgs(file, sender string, flags *flag.FlagSet) (types.MsgStor
// gzip the wasm file
if ioutils.IsWasm(wasm) {
wasm, err = ioutils.GzipIt(wasm)

if err != nil {
return types.MsgStoreCode{}, err
}
Expand Down
8 changes: 7 additions & 1 deletion x/wasm/ibc.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,16 @@ func (i IBCHandler) OnRecvPacket(
if err != nil {
ack = channeltypes.NewErrorAcknowledgement(err)
// the state gets reverted, so we drop all captured events
} else if ack == nil || ack.Success() {
} else if ack == nil || ack.Acknowledgement() == nil {
// emit all contract and submessage events on success
// nil ack is a success case, see: https://github.com/cosmos/ibc-go/blob/v7.0.0/modules/core/keeper/msg_server.go#L453
ctx.EventManager().EmitEvents(em.Events())
// contract wants async acknowledgement, so store the packet for later
i.keeper.StoreAsyncAckPacket(ctx, packet)
ack = nil
} else if ack.Success() {
// emit all contract and submessage events on success
ctx.EventManager().EmitEvents(em.Events())
chipshort marked this conversation as resolved.
Show resolved Hide resolved
}
types.EmitAcknowledgementEvent(ctx, contractAddr, ack, err)
return ack
Expand Down
97 changes: 95 additions & 2 deletions x/wasm/ibc_integration_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package wasm_test

import (
"encoding/base64"
"encoding/json"
"fmt"
"testing"

wasmvm "github.com/CosmWasm/wasmvm/v2"
Expand Down Expand Up @@ -178,6 +180,9 @@ func TestOnIBCPacketReceive(t *testing.T) {
packetData: []byte(`{"panic":{}}`),
expPacketNotHandled: true,
},
"without ack": {
packetData: []byte(`{"no_ack":{}}`),
},
}
for name, spec := range specs {
t.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -230,13 +235,13 @@ func TestOnIBCPacketReceive(t *testing.T) {

// then
if spec.expPacketNotHandled {
const contractPanicToErrMsg = `recovered: Error calling the VM: Error executing Wasm: Wasmer runtime error: RuntimeError: Aborted: panicked at 'This page intentionally faulted'`
const contractPanicToErrMsg = `recovered: Error calling the VM: Error executing Wasm: Wasmer runtime error: RuntimeError: Aborted: panicked at`
assert.ErrorContains(t, err, contractPanicToErrMsg)
require.Nil(t, *capturedAck)
return
}
require.NoError(t, err)
if spec.expAck != nil {
require.NoError(t, err)
assert.Equal(t, spec.expAck, *capturedAck, string(*capturedAck))
} else {
require.Nil(t, *capturedAck)
Expand All @@ -245,6 +250,94 @@ func TestOnIBCPacketReceive(t *testing.T) {
}
}

func TestIBCAsyncAck(t *testing.T) {
// given 2 chains with a mock on chain A to control the IBC flow
// and the ibc-reflect contract on chain B
// when the no_ack package is relayed
// then the contract does not produce an ack
// and
// when the async_ack message is executed on chain B
// then the contract produces the ack

ackBytes := []byte("my ack")

mockContractEngine := NewCaptureAckTestContractEngine()
chainAOpts := []wasmkeeper.Option{
wasmkeeper.WithWasmEngine(mockContractEngine),
}
var (
coord = wasmibctesting.NewCoordinator(t, 2, chainAOpts)
chainA = coord.GetChain(wasmibctesting.GetChainID(1))
chainB = coord.GetChain(wasmibctesting.GetChainID(2))
)
// setup chain A contract metadata for mock
myMockContractAddr := chainA.SeedNewContractInstance() // setups env but uses mock contract

// setup chain B contracts
reflectID := chainB.StoreCodeFile("./keeper/testdata/reflect_1_5.wasm").CodeID
initMsg, err := json.Marshal(wasmkeeper.IBCReflectInitMsg{ReflectCodeID: reflectID})
require.NoError(t, err)
codeID := chainB.StoreCodeFile("./keeper/testdata/ibc_reflect.wasm").CodeID
ibcReflectContractAddr := chainB.InstantiateContract(codeID, initMsg)

// establish IBC channels
var (
sourcePortID = chainA.ContractInfo(myMockContractAddr).IBCPortID
counterpartPortID = chainB.ContractInfo(ibcReflectContractAddr).IBCPortID
path = wasmibctesting.NewPath(chainA, chainB)
)
path.EndpointA.ChannelConfig = &ibctesting.ChannelConfig{
PortID: sourcePortID, Version: "ibc-reflect-v1", Order: channeltypes.UNORDERED,
}
path.EndpointB.ChannelConfig = &ibctesting.ChannelConfig{
PortID: counterpartPortID, Version: "ibc-reflect-v1", Order: channeltypes.UNORDERED,
}

coord.SetupConnections(path)
coord.CreateChannels(path)
coord.CommitBlock(chainA, chainB)
require.Equal(t, 0, len(chainA.PendingSendPackets))
require.Equal(t, 0, len(chainB.PendingSendPackets))

// when the "no_ack" ibc packet is sent from chain A to chain B
capturedAck := mockContractEngine.SubmitIBCPacket(t, path, chainA, myMockContractAddr, []byte(`{"no_ack":{}}`))
coord.CommitBlock(chainA, chainB)

require.Equal(t, 1, len(chainA.PendingSendPackets))
require.Equal(t, 0, len(chainB.PendingSendPackets))

// we don't expect an ack yet
err = path.RelayPacketWithoutAck(chainA.PendingSendPackets[0], nil)
noAckPacket := chainA.PendingSendPackets[0]
chainA.PendingSendPackets = []channeltypes.Packet{}
require.NoError(t, err)
assert.Nil(t, *capturedAck)

// when the "async_ack" ibc packet is sent from chain A to chain B
destChannel := path.EndpointB.ChannelID
packetSeq := 1
ackData := base64.StdEncoding.EncodeToString(ackBytes)
ack := fmt.Sprintf(`{"data":"%s"}`, ackData)
msg := fmt.Sprintf(`{"async_ack":{"channel_id":"%s","packet_sequence": "%d", "ack": %s}}`, destChannel, packetSeq, ack)
res, err := chainB.SendMsgs(&types.MsgExecuteContract{
Sender: chainB.SenderAccount.GetAddress().String(),
Contract: ibcReflectContractAddr.String(),
Msg: []byte(msg),
})
require.NoError(t, err)

// relay the ack
err = path.EndpointA.UpdateClient()
require.NoError(t, err)
acknowledgement, err := wasmibctesting.ParseAckFromEvents(res.GetEvents())
require.NoError(t, err)
err = path.EndpointA.AcknowledgePacket(noAckPacket, acknowledgement)
require.NoError(t, err)

// now ack for the no_ack packet should have arrived
require.Equal(t, ackBytes, *capturedAck)
}

// mock to submit an ibc data package from given chain and capture the ack
type captureAckTestContractEngine struct {
*wasmtesting.MockWasmEngine
Expand Down
28 changes: 12 additions & 16 deletions x/wasm/ibc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cosmos/cosmos-sdk/types/address"

"github.com/CosmWasm/wasmd/x/wasm/keeper"
"github.com/CosmWasm/wasmd/x/wasm/keeper/wasmtesting"
"github.com/CosmWasm/wasmd/x/wasm/types"
)

Expand Down Expand Up @@ -101,14 +102,14 @@ func TestOnRecvPacket(t *testing.T) {
}
for name, spec := range specs {
t.Run(name, func(t *testing.T) {
mock := IBCContractKeeperMock{
mock := wasmtesting.IBCContractKeeperMock{
OnRecvPacketFn: func(ctx sdk.Context, contractAddr sdk.AccAddress, msg wasmvmtypes.IBCPacketReceiveMsg) (ibcexported.Acknowledgement, error) {
// additional custom event to confirm event handling on state commit/ rollback
ctx.EventManager().EmitEvent(myCustomEvent)
return spec.contractRsp, spec.contractOkMsgExecErr
},
}
h := NewIBCHandler(mock, nil, nil)
h := NewIBCHandler(&mock, nil, nil)
em := &sdk.EventManager{}
ctx := sdk.Context{}.WithEventManager(em)
if spec.expPanic {
Expand All @@ -120,6 +121,15 @@ func TestOnRecvPacket(t *testing.T) {
gotAck := h.OnRecvPacket(ctx, spec.ibcPkg, anyRelayerAddr)
assert.Equal(t, spec.expAck, gotAck)
assert.Equal(t, spec.expEvents, em.Events())

// make sure packet is stored for async ack
asyncAckPacket, err := mock.LoadAsyncAckPacket(ctx, spec.ibcPkg.DestinationPort, spec.ibcPkg.DestinationChannel, spec.ibcPkg.Sequence)
if spec.expAck == nil {
assert.NoError(t, err)
assert.Equal(t, spec.ibcPkg, asyncAckPacket)
} else {
assert.Error(t, err)
}
})
}
}
Expand Down Expand Up @@ -195,17 +205,3 @@ func IBCPacketFixture(mutators ...func(p *channeltypes.Packet)) channeltypes.Pac
}
return r
}

var _ types.IBCContractKeeper = &IBCContractKeeperMock{}

type IBCContractKeeperMock struct {
types.IBCContractKeeper
OnRecvPacketFn func(ctx sdk.Context, contractAddr sdk.AccAddress, msg wasmvmtypes.IBCPacketReceiveMsg) (ibcexported.Acknowledgement, error)
}

func (m IBCContractKeeperMock) OnRecvPacket(ctx sdk.Context, contractAddr sdk.AccAddress, msg wasmvmtypes.IBCPacketReceiveMsg) (ibcexported.Acknowledgement, error) {
if m.OnRecvPacketFn == nil {
panic("not expected to be called")
}
return m.OnRecvPacketFn(ctx, contractAddr, msg)
}
51 changes: 51 additions & 0 deletions x/wasm/ibctesting/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,57 @@ func (path *Path) RelayPacket(packet channeltypes.Packet, _ []byte) error {
return fmt.Errorf("packet commitment does not exist on either endpoint for provided packet")
}

// RelayPacketWithoutAck attempts to relay the packet first on EndpointA and then on EndpointB
// if EndpointA does not contain a packet commitment for that packet. An error is returned
// if a relay step fails or the packet commitment does not exist on either endpoint.
// In contrast to RelayPacket, this function does not acknowledge the packet and expects it to have no acknowledgement yet.
// It is useful for testing async acknowledgement.
func (path *Path) RelayPacketWithoutAck(packet channeltypes.Packet, _ []byte) error {
pc := path.EndpointA.Chain.App.GetIBCKeeper().ChannelKeeper.GetPacketCommitment(path.EndpointA.Chain.GetContext(), packet.GetSourcePort(), packet.GetSourceChannel(), packet.GetSequence())
if bytes.Equal(pc, channeltypes.CommitPacket(path.EndpointA.Chain.App.AppCodec(), packet)) {

// packet found, relay from A to B
if err := path.EndpointB.UpdateClient(); err != nil {
return err
}

res, err := path.EndpointB.RecvPacketWithResult(packet)
if err != nil {
return err
}

_, err = ParseAckFromEvents(res.GetEvents())
if err == nil {
return fmt.Errorf("tried to relay packet without ack but got ack")
}

return nil
}

pc = path.EndpointB.Chain.App.GetIBCKeeper().ChannelKeeper.GetPacketCommitment(path.EndpointB.Chain.GetContext(), packet.GetSourcePort(), packet.GetSourceChannel(), packet.GetSequence())
if bytes.Equal(pc, channeltypes.CommitPacket(path.EndpointB.Chain.App.AppCodec(), packet)) {

// packet found, relay B to A
if err := path.EndpointA.UpdateClient(); err != nil {
return err
}

res, err := path.EndpointA.RecvPacketWithResult(packet)
if err != nil {
return err
}

_, err = ParseAckFromEvents(res.GetEvents())
if err == nil {
return fmt.Errorf("tried to relay packet without ack but got ack")
}

return nil
}

return fmt.Errorf("packet commitment does not exist on either endpoint for provided packet")
}

// SendMsg delivers the provided messages to the chain. The counterparty
// client is updated with the new source consensus state.
func (path *Path) SendMsg(msgs ...sdk.Msg) error {
Expand Down
Loading