Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: optimize pending packets storage on consumer + migration #1037

Merged
merged 29 commits into from Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5bb3e14
wip
shaspitz Jun 19, 2023
6b6eabe
tests
shaspitz Jun 19, 2023
84e0250
tests
shaspitz Jun 19, 2023
0eafb05
update genesis tests
shaspitz Jun 19, 2023
1af896f
comments
shaspitz Jun 19, 2023
cf2359d
migration and changelog
shaspitz Jun 19, 2023
1c7e7df
migration test
shaspitz Jun 19, 2023
84d4a75
Merge branch 'main' into shawn/optimize-pending-packets-storage
shaspitz Jun 20, 2023
98dd226
lints
shaspitz Jun 20, 2023
35a464b
Merge branch 'main' into shawn/optimize-pending-packets-storage
shaspitz Jun 21, 2023
d4393e9
merge fixes
shaspitz Jun 21, 2023
18f852a
clean
shaspitz Jun 21, 2023
0e1bd28
Merge branch 'main' into shawn/optimize-pending-packets-storage
shaspitz Jun 22, 2023
9737d0f
Update ccv.pb.go
shaspitz Jun 22, 2023
189c1da
add to ADR
shaspitz Jun 22, 2023
1c33106
address some PR comments
shaspitz Jun 23, 2023
394f709
Merge branch 'main' into shawn/optimize-pending-packets-storage
shaspitz Jun 23, 2023
a42229a
comment
shaspitz Jun 26, 2023
05acaba
Merge branch 'main' into shawn/optimize-pending-packets-storage
shaspitz Jun 29, 2023
d918fb4
Merge branch 'main' into shawn/optimize-pending-packets-storage
shaspitz Jul 1, 2023
f517201
Update ccv.pb.go
shaspitz Jul 1, 2023
e3f2133
Merge branch 'main' into shawn/optimize-pending-packets-storage
shaspitz Jul 6, 2023
747e8aa
Merge branch 'main' into shawn/optimize-pending-packets-storage
shaspitz Jul 10, 2023
d84b29d
lint
shaspitz Jul 11, 2023
8084b49
Merge branch 'main' into shawn/optimize-pending-packets-storage
shaspitz Jul 13, 2023
ab23828
Update x/ccv/consumer/keeper/keeper.go
shaspitz Jul 13, 2023
98c64d4
Merge branch 'main' into shawn/optimize-pending-packets-storage
shaspitz Jul 13, 2023
db04b8f
byte wise
shaspitz Jul 13, 2023
e6f696e
Merge branch 'shawn/optimize-pending-packets-storage' of https://gith…
shaspitz Jul 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion proto/interchain_security/ccv/v1/ccv.proto
Expand Up @@ -58,17 +58,19 @@ message MaturedUnbondingOps {
repeated uint64 ids = 1;
}

// ConsumerPacketData contains a consumer packet data and a type tag
// ConsumerPacketData contains a consumer packet data, type tag, and index for storage.
message ConsumerPacketData {
ConsumerPacketDataType type = 1;

oneof data {
SlashPacketData slashPacketData = 2;
VSCMaturedPacketData vscMaturedPacketData = 3;
}
uint64 idx = 4;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra care should be taken in reviewing this PR, as it does mutate a proto file. Note this index field was added to the ConsumerPacketData type so that we can use that index in PendingDataPacketsKey, to implement a queue with constant append time. See AppendPendingPacket

}


// [Depreciated] favor using []ConsumerPacketData directly.
// ConsumerPacketDataList is a list of consumer packet data packets.
message ConsumerPacketDataList {
repeated ConsumerPacketData list = 1
Expand Down
5 changes: 2 additions & 3 deletions tests/integration/expired_client.go
Expand Up @@ -122,7 +122,7 @@ func (s *CCVTestSuite) TestConsumerPacketSendExpiredClient() {
// check that the packets were added to the list of pending data packets
consumerPackets := consumerKeeper.GetPendingPackets(s.consumerCtx())
s.Require().NotEmpty(consumerPackets)
s.Require().Equal(2, len(consumerPackets.GetList()), "unexpected number of pending data packets")
s.Require().Len(consumerPackets, 2, "unexpected number of pending data packets")

// try to send slash packet for downtime infraction
addr := ed25519.GenPrivKey().PubKey().Address()
Expand All @@ -136,7 +136,7 @@ func (s *CCVTestSuite) TestConsumerPacketSendExpiredClient() {
// check that the packets were added to the list of pending data packets
consumerPackets = consumerKeeper.GetPendingPackets(s.consumerCtx())
s.Require().NotEmpty(consumerPackets)
s.Require().Equal(4, len(consumerPackets.GetList()), "unexpected number of pending data packets")
s.Require().Len(consumerPackets, 4, "unexpected number of pending data packets")

// upgrade expired client to the consumer
upgradeExpiredClient(s, Provider)
Expand All @@ -147,7 +147,6 @@ func (s *CCVTestSuite) TestConsumerPacketSendExpiredClient() {
// check that the list of pending data packets is emptied
consumerPackets = consumerKeeper.GetPendingPackets(s.consumerCtx())
s.Require().Empty(consumerPackets)
s.Require().Equal(0, len(consumerPackets.GetList()), "unexpected number of pending data packets")

// relay all packet from consumer to provider
relayAllCommittedPackets(s, s.consumerChain, s.path, ccv.ConsumerPortID, s.path.EndpointA.ChannelID, 4)
Expand Down
24 changes: 12 additions & 12 deletions tests/integration/slashing.go
Expand Up @@ -486,15 +486,15 @@ func (suite *CCVTestSuite) TestValidatorDowntime() {

// check that slash packet is queued
pendingPackets := consumerKeeper.GetPendingPackets(ctx)
suite.Require().NotEmpty(pendingPackets.List, "pending packets empty")
suite.Require().Len(pendingPackets.List, 1, "pending packets len should be 1 is %d", len(pendingPackets.List))
suite.Require().NotEmpty(pendingPackets, "pending packets empty")
suite.Require().Len(pendingPackets, 1, "pending packets len should be 1 is %d", len(pendingPackets))

// clear queue, commit packets
suite.consumerApp.GetConsumerKeeper().SendPackets(ctx)

// check queue was cleared
pendingPackets = suite.consumerApp.GetConsumerKeeper().GetPendingPackets(ctx)
suite.Require().Empty(pendingPackets.List, "pending packets NOT empty")
suite.Require().Empty(pendingPackets, "pending packets NOT empty")

// verify that the slash packet was sent
gotCommit := consumerIBCKeeper.ChannelKeeper.GetPacketCommitment(ctx, ccv.ConsumerPortID, channelID, seq)
Expand Down Expand Up @@ -573,15 +573,15 @@ func (suite *CCVTestSuite) TestValidatorDoubleSigning() {

// check slash packet is queued
pendingPackets := suite.consumerApp.GetConsumerKeeper().GetPendingPackets(ctx)
suite.Require().NotEmpty(pendingPackets.List, "pending packets empty")
suite.Require().Len(pendingPackets.List, 1, "pending packets len should be 1 is %d", len(pendingPackets.List))
suite.Require().NotEmpty(pendingPackets, "pending packets empty")
suite.Require().Len(pendingPackets, 1, "pending packets len should be 1 is %d", len(pendingPackets))

// clear queue, commit packets
suite.consumerApp.GetConsumerKeeper().SendPackets(ctx)

// check queue was cleared
pendingPackets = suite.consumerApp.GetConsumerKeeper().GetPendingPackets(ctx)
suite.Require().Empty(pendingPackets.List, "pending packets NOT empty")
suite.Require().Empty(pendingPackets, "pending packets NOT empty")

// check slash packet is sent
gotCommit := suite.consumerApp.GetIBCKeeper().ChannelKeeper.GetPacketCommitment(ctx, ccv.ConsumerPortID, channelID, seq)
Expand Down Expand Up @@ -636,7 +636,7 @@ func (suite *CCVTestSuite) TestQueueAndSendSlashPacket() {
// the downtime slash request duplicates
dataPackets := consumerKeeper.GetPendingPackets(ctx)
suite.Require().NotEmpty(dataPackets)
suite.Require().Len(dataPackets.GetList(), 12)
suite.Require().Len(dataPackets, 12)

// save consumer next sequence
seq, _ := consumerIBCKeeper.ChannelKeeper.GetNextSequenceSend(ctx, ccv.ConsumerPortID, channelID)
Expand All @@ -663,7 +663,7 @@ func (suite *CCVTestSuite) TestQueueAndSendSlashPacket() {
// check that pending data packets got cleared
dataPackets = consumerKeeper.GetPendingPackets(ctx)
suite.Require().Empty(dataPackets)
suite.Require().Len(dataPackets.GetList(), 0)
suite.Require().Len(dataPackets, 0)
}

// TestCISBeforeCCVEstablished tests that the consumer chain doesn't panic or
Expand All @@ -674,14 +674,14 @@ func (suite *CCVTestSuite) TestCISBeforeCCVEstablished() {

// Check pending packets is empty
pendingPackets := consumerKeeper.GetPendingPackets(suite.consumerCtx())
suite.Require().Len(pendingPackets.List, 0)
suite.Require().Len(pendingPackets, 0)

consumerKeeper.Slash(suite.consumerCtx(), []byte{0x01, 0x02, 0x3},
66, 4324, sdk.MustNewDecFromStr("0.05"), stakingtypes.Downtime)

// Check slash packet was queued
pendingPackets = consumerKeeper.GetPendingPackets(suite.consumerCtx())
suite.Require().Len(pendingPackets.List, 1)
suite.Require().Len(pendingPackets, 1)

// Pass 5 blocks, confirming the consumer doesn't panic
for i := 0; i < 5; i++ {
Expand All @@ -690,7 +690,7 @@ func (suite *CCVTestSuite) TestCISBeforeCCVEstablished() {

// Check packet is still queued
pendingPackets = consumerKeeper.GetPendingPackets(suite.consumerCtx())
suite.Require().Len(pendingPackets.List, 1)
suite.Require().Len(pendingPackets, 1)

// establish ccv channel
suite.SetupCCVChannel(suite.path)
Expand All @@ -699,5 +699,5 @@ func (suite *CCVTestSuite) TestCISBeforeCCVEstablished() {
// Pass one more block, and confirm the packet is sent now that ccv channel is established
suite.consumerChain.NextBlock()
pendingPackets = consumerKeeper.GetPendingPackets(suite.consumerCtx())
suite.Require().Len(pendingPackets.List, 0)
suite.Require().Len(pendingPackets, 0)
}
16 changes: 12 additions & 4 deletions x/ccv/consumer/keeper/genesis.go
Expand Up @@ -89,9 +89,12 @@ func (k Keeper) InitGenesis(ctx sdk.Context, state *consumertypes.GenesisState)
k.SetLastTransmissionBlockHeight(ctx, state.LastTransmissionBlockHeight)
}

// set pending consumer pending packets
// Set pending consumer packets, using the depreciated ConsumerPacketDataList type
// that exists for genesis.
// note that the list includes pending mature VSC packet only if the handshake is completed
k.AppendPendingPacket(ctx, state.PendingConsumerPackets.List...)
for _, packet := range state.PendingConsumerPackets.List {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Genesis methods still expect the depreciated type ConsumerPacketDataList to reduce the complexity of how we'd actually deploy this PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment on ccv.proto

k.AppendPendingPacket(ctx, packet.Type, packet.Data)
}

// set height to valset update id mapping
for _, h2v := range state.HeightToValsetUpdateId {
Expand Down Expand Up @@ -122,6 +125,11 @@ func (k Keeper) ExportGenesis(ctx sdk.Context) (genesis *consumertypes.GenesisSt
// export the current validator set
valset := k.MustGetCurrentValidatorsAsABCIUpdates(ctx)

// export pending packets using the depreciated ConsumerPacketDataList type
pendingPackets := k.GetPendingPackets(ctx)
pendingPacketsDepreciated := ccv.ConsumerPacketDataList{}
pendingPacketsDepreciated.List = append(pendingPacketsDepreciated.List, pendingPackets...)

// export all the states created after a provider channel got established
if channelID, ok := k.GetProviderChannel(ctx); ok {
clientID, found := k.GetProviderClientID(ctx)
Expand All @@ -136,7 +144,7 @@ func (k Keeper) ExportGenesis(ctx sdk.Context) (genesis *consumertypes.GenesisSt
k.GetAllPacketMaturityTimes(ctx),
valset,
k.GetAllHeightToValsetUpdateIDs(ctx),
k.GetPendingPackets(ctx),
pendingPacketsDepreciated,
k.GetAllOutstandingDowntimes(ctx),
k.GetLastTransmissionBlockHeight(ctx),
params,
Expand All @@ -156,7 +164,7 @@ func (k Keeper) ExportGenesis(ctx sdk.Context) (genesis *consumertypes.GenesisSt
nil,
valset,
k.GetAllHeightToValsetUpdateIDs(ctx),
k.GetPendingPackets(ctx),
pendingPacketsDepreciated,
nil,
consumertypes.LastTransmissionBlockHeight{},
params,
Expand Down
27 changes: 23 additions & 4 deletions x/ccv/consumer/keeper/genesis_test.go
Expand Up @@ -148,7 +148,12 @@ func TestInitGenesis(t *testing.T) {
func(ctx sdk.Context, ck consumerkeeper.Keeper, gs *consumertypes.GenesisState) {
assertConsumerPortIsBound(t, ctx, &ck)

require.Equal(t, pendingDataPackets, ck.GetPendingPackets(ctx))
obtainedPendingPackets := ck.GetPendingPackets(ctx)
for idx, expectedPacketData := range pendingDataPackets.List {
require.Equal(t, expectedPacketData.Type, obtainedPendingPackets[idx].Type)
require.Equal(t, expectedPacketData.Data, obtainedPendingPackets[idx].Data)
}

assertHeightValsetUpdateIDs(t, ctx, &ck, defaultHeightValsetUpdateIDs)
assertProviderClientID(t, ctx, &ck, provClientID)
require.Equal(t, validator.Address.Bytes(), ck.GetAllCCValidator(ctx)[0].Address)
Expand Down Expand Up @@ -186,7 +191,12 @@ func TestInitGenesis(t *testing.T) {
require.Equal(t, provChannelID, gotChannelID)

require.True(t, ck.PacketMaturityTimeExists(ctx, matPackets[0].VscId, matPackets[0].MaturityTime))
require.Equal(t, pendingDataPackets, ck.GetPendingPackets(ctx))

obtainedPendingPackets := ck.GetPendingPackets(ctx)
for idx, expectedPacketData := range pendingDataPackets.List {
require.Equal(t, expectedPacketData.Type, obtainedPendingPackets[idx].Type)
require.Equal(t, expectedPacketData.Data, obtainedPendingPackets[idx].Data)
}

require.Equal(t, gs.OutstandingDowntimeSlashing, ck.GetAllOutstandingDowntimes(ctx))

Expand Down Expand Up @@ -252,12 +262,16 @@ func TestExportGenesis(t *testing.T) {
Data: &ccv.ConsumerPacketData_SlashPacketData{
SlashPacketData: ccv.NewSlashPacketData(abciValidator, vscID, stakingtypes.Downtime),
},
Idx: 0,
},
{
Type: ccv.VscMaturedPacket,
Data: &ccv.ConsumerPacketData_VscMaturedPacketData{
VscMaturedPacketData: ccv.NewVSCMaturedPacketData(vscID),
},
// This idx is a part of the expected genesis state.
// If the keeper is correctly storing consumer packet data, indexes should be populated.
Idx: 1,
},
},
}
Expand Down Expand Up @@ -291,7 +305,10 @@ func TestExportGenesis(t *testing.T) {
ck.SetCCValidator(ctx, cVal)
ck.SetParams(ctx, params)

ck.AppendPendingPacket(ctx, consPackets.List...)
for _, packet := range consPackets.List {
ck.AppendPendingPacket(ctx, packet.Type, packet.Data)
}

ck.SetHeightValsetUpdateID(ctx, defaultHeightValsetUpdateIDs[0].Height, defaultHeightValsetUpdateIDs[0].ValsetUpdateId)
},
consumertypes.NewRestartGenesisState(
Expand Down Expand Up @@ -321,7 +338,9 @@ func TestExportGenesis(t *testing.T) {
ck.SetHeightValsetUpdateID(ctx, updatedHeightValsetUpdateIDs[0].Height, updatedHeightValsetUpdateIDs[0].ValsetUpdateId)
ck.SetHeightValsetUpdateID(ctx, updatedHeightValsetUpdateIDs[1].Height, updatedHeightValsetUpdateIDs[1].ValsetUpdateId)

ck.AppendPendingPacket(ctx, consPackets.List...)
for _, packet := range consPackets.List {
ck.AppendPendingPacket(ctx, packet.Type, packet.Data)
}

// populate the required states for an established CCV channel
ck.SetPacketMaturityTime(ctx, matPackets[0].VscId, matPackets[0].MaturityTime)
Expand Down
88 changes: 59 additions & 29 deletions x/ccv/consumer/keeper/keeper.go
Expand Up @@ -565,48 +565,78 @@ func (k Keeper) GetAllCCValidator(ctx sdk.Context) (validators []types.CrossChai
return validators
}

// SetPendingPackets sets the pending CCV packets
func (k Keeper) SetPendingPackets(ctx sdk.Context, packets ccv.ConsumerPacketDataList) {
store := ctx.KVStore(k.storeKey)
bz, err := packets.Marshal()
if err != nil {
// This should never happen
panic(fmt.Errorf("failed to marshal ConsumerPacketDataList: %w", err))
}
store.Set(types.PendingDataPacketsKey(), bz)
// Note: PendingDataPacketsBytePrefix is the correct prefix, NOT PendingDataPacketsByteKey.
// See consistency with PendingDataPacketsKey().
shaspitz marked this conversation as resolved.
Show resolved Hide resolved
func PendingDataPacketsIterator(store sdk.KVStore) sdk.Iterator {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The meat of this PR is in this file

return sdk.KVStorePrefixIterator(store, []byte{types.PendingDataPacketsBytePrefix})
}

// GetPendingPackets returns the pending CCV packets from the store
func (k Keeper) GetPendingPackets(ctx sdk.Context) ccv.ConsumerPacketDataList {
var packets ccv.ConsumerPacketDataList

func (k Keeper) getAndIncrementPendingPacketsIdx(ctx sdk.Context) (toReturn uint64) {
store := ctx.KVStore(k.storeKey)
bz := store.Get(types.PendingDataPacketsKey())
bz := store.Get(types.PendingPacketsIndexKey())
if bz == nil {
return packets
toReturn = 0
} else {
toReturn = binary.BigEndian.Uint64(bz)
shaspitz marked this conversation as resolved.
Show resolved Hide resolved
}
toStore := toReturn + 1
store.Set(types.PendingPacketsIndexKey(), sdk.Uint64ToBigEndian(toStore))
return toReturn
}

err := packets.Unmarshal(bz)
if err != nil {
// An error here would indicate something is very wrong,
// the PendingPackets are assumed to be correctly serialized in SetPendingPackets.
panic(fmt.Errorf("failed to unmarshal pending data packets: %w", err))
// GetPendingPackets returns ALL the pending CCV packets from the store
func (k Keeper) GetPendingPackets(ctx sdk.Context) []ccv.ConsumerPacketData {
var packets []ccv.ConsumerPacketData
store := ctx.KVStore(k.storeKey)
iterator := PendingDataPacketsIterator(store)
shaspitz marked this conversation as resolved.
Show resolved Hide resolved
defer iterator.Close()
for ; iterator.Valid(); iterator.Next() {
var packet ccv.ConsumerPacketData
bz := iterator.Value()
err := packet.Unmarshal(bz)
if err != nil {
// An error here would indicate something is very wrong,
panic(fmt.Errorf("failed to unmarshal pending data packet: %w", err))
}
packets = append(packets, packet)
}

return packets
}

// DeletePendingDataPackets clears the pending data packets in store
func (k Keeper) DeletePendingDataPackets(ctx sdk.Context) {
// DeletePendingDataPackets deletes pending data packets with given indexes
func (k Keeper) DeletePendingDataPackets(ctx sdk.Context, idxs ...uint64) {
store := ctx.KVStore(k.storeKey)
for _, idx := range idxs {
store.Delete(types.PendingDataPacketsKey(idx))
}
}

func (k Keeper) DeleteAllPendingDataPackets(ctx sdk.Context) {
shaspitz marked this conversation as resolved.
Show resolved Hide resolved
store := ctx.KVStore(k.storeKey)
store.Delete(types.PendingDataPacketsKey())
// Note: PendingDataPacketsBytePrefix is the correct prefix, NOT PendingDataPacketsByteKey.
// See consistency with PendingDataPacketsKey().
iterator := PendingDataPacketsIterator(store)
shaspitz marked this conversation as resolved.
Show resolved Hide resolved
defer iterator.Close()
for ; iterator.Valid(); iterator.Next() {
store.Delete(iterator.Key())
}
}

// AppendPendingDataPacket appends the given data packet to the pending data packets in store
func (k Keeper) AppendPendingPacket(ctx sdk.Context, packet ...ccv.ConsumerPacketData) {
pending := k.GetPendingPackets(ctx)
list := append(pending.GetList(), packet...)
k.SetPendingPackets(ctx, ccv.ConsumerPacketDataList{List: list})
// AppendPendingPacket enqueues the given data packet to the end of the pending data packets queue
func (k Keeper) AppendPendingPacket(ctx sdk.Context, packetType ccv.ConsumerPacketDataType, data ccv.ExportedIsConsumerPacketData_Data) {
cpd := ccv.NewConsumerPacketData(
packetType,
data,
k.getAndIncrementPendingPacketsIdx(ctx),
)
key := types.PendingDataPacketsKey(cpd.Idx)
store := ctx.KVStore(k.storeKey)
bz, err := cpd.Marshal()
if err != nil {
// This should never happen
panic(fmt.Errorf("failed to marshal ConsumerPacketData: %w", err))
}
store.Set(key, bz)
}

func (k Keeper) MarkAsPrevStandaloneChain(ctx sdk.Context) {
Expand Down