Skip to content

Commit

Permalink
feat(vstorage): batch and dedupe events per block
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed Jan 26, 2023
1 parent 1739e7d commit 312b608
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 20 deletions.
7 changes: 4 additions & 3 deletions golang/cosmos/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,8 +572,8 @@ func NewAgoricApp(
params.NewAppModule(app.ParamsKeeper),
transferModule,
icaModule,
swingset.NewAppModule(app.SwingSetKeeper),
vstorage.NewAppModule(app.VstorageKeeper),
swingset.NewAppModule(app.SwingSetKeeper),
vibcModule,
vbankModule,
lienModule,
Expand Down Expand Up @@ -605,14 +605,13 @@ func NewAgoricApp(
feegrant.ModuleName,
paramstypes.ModuleName,
vestingtypes.ModuleName,
swingset.ModuleName,
vstorage.ModuleName,
swingset.ModuleName,
vibc.ModuleName,
vbank.ModuleName,
lien.ModuleName,
)
app.mm.SetOrderEndBlockers(
vstorage.ModuleName,
vibc.ModuleName,
vbank.ModuleName,
lien.ModuleName,
Expand All @@ -637,6 +636,8 @@ func NewAgoricApp(
vestingtypes.ModuleName,
// SwingSet needs to be last, for it to capture all the pushed actions.
swingset.ModuleName,
// And then vstorage, to produce SwingSet-induced events.
vstorage.ModuleName,
)

// NOTE: The genutils module must occur after staking so that pools are
Expand Down
126 changes: 109 additions & 17 deletions golang/cosmos/x/vstorage/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package keeper
import (
"bytes"
"encoding/json"
"sort"
"strconv"
"strings"

Expand All @@ -23,14 +24,85 @@ type StreamCell struct {
Values []string `json:"values"`
}

type ProposedChange struct {
Path string
ValueFromLastBlock string
NewValue string
LegacyEvents bool
}

type ChangeManager interface {
Track(ctx sdk.Context, k Keeper, path, value string, isLegacy bool)
EmitEvents(ctx sdk.Context, k Keeper)
Rollback(ctx sdk.Context)
}

type BatchingChangeManager struct {
// Map from storage path to proposed change.
changes map[string]*ProposedChange
}

var _ ChangeManager = (*BatchingChangeManager)(nil)

// Keeper maintains the link to data storage and exposes getter/setter methods
// for the various parts of the state machine
type Keeper struct {
storeKey sdk.StoreKey
changeManager ChangeManager
storeKey sdk.StoreKey
}

func (bcm *BatchingChangeManager) Track(ctx sdk.Context, k Keeper, path, value string, isLegacy bool) {
if change, ok := bcm.changes[path]; ok {
change.NewValue = value
if isLegacy {
change.LegacyEvents = true
}
return
}
bcm.changes[path] = &ProposedChange{
Path: path,
NewValue: value,
ValueFromLastBlock: k.GetData(ctx, path),
LegacyEvents: isLegacy,
}
}

func (bcm *BatchingChangeManager) Rollback(ctx sdk.Context) {
bcm.changes = make(map[string]*ProposedChange)
}

// EmitEvents emits events for all actual changes.
// This does not clear the cache, so the caller must call Rollback() to do so.
func (bcm *BatchingChangeManager) EmitEvents(ctx sdk.Context, k Keeper) {
changes := bcm.changes
if len(changes) == 0 {
return
}

// Deterministic order.
sortedPaths := make([]string, 0, len(changes))
for path := range changes {
sortedPaths = append(sortedPaths, path)
}
sort.Strings(sortedPaths)

for _, path := range sortedPaths {
change := bcm.changes[path]
k.EmitChange(ctx, change)
}
}

// The BatchingChangeManager needs to be a pointer because its state is mutated.
func NewBatchingChangeManager() *BatchingChangeManager {
bcm := BatchingChangeManager{changes: make(map[string]*ProposedChange)}
return &bcm
}

func NewKeeper(storeKey sdk.StoreKey) Keeper {
return Keeper{storeKey}
return Keeper{
storeKey: storeKey,
changeManager: NewBatchingChangeManager(),
}
}

// ExportStorage fetches all storage
Expand Down Expand Up @@ -61,6 +133,29 @@ func (k Keeper) ImportStorage(ctx sdk.Context, entries []*types.DataEntry) {
}
}

func (k Keeper) EmitChange(ctx sdk.Context, change *ProposedChange) {
if change.NewValue == change.ValueFromLastBlock {
// No change.
return
}

if change.LegacyEvents {
// Emit the legacy change event.
ctx.EventManager().EmitEvent(
types.NewLegacyStorageEvent(change.Path, change.NewValue),
)
}

// Emit the new state change event.
ctx.EventManager().EmitEvent(
agoric.NewStateChangeEvent(
k.GetStoreName(),
k.PathToEncodedKey(change.Path),
[]byte(change.NewValue),
),
)
}

// GetData gets generic storage. The default value is an empty string.
func (k Keeper) GetData(ctx sdk.Context, path string) string {
//fmt.Printf("GetData(%s)\n", path);
Expand Down Expand Up @@ -117,26 +212,23 @@ func (k Keeper) HasChildren(ctx sdk.Context, path string) bool {
return iterator.Valid()
}

func (k Keeper) LegacySetStorageAndNotify(ctx sdk.Context, path, value string) {
k.SetStorage(ctx, path, value)
func (k Keeper) NewChangeBatch(ctx sdk.Context) {
k.changeManager.Rollback(ctx)
}

// Emit the legacy change event.
ctx.EventManager().EmitEvent(
types.NewLegacyStorageEvent(path, value),
)
func (k Keeper) FlushChangeEvents(ctx sdk.Context) {
k.changeManager.EmitEvents(ctx, k)
k.changeManager.Rollback(ctx)
}

func (k Keeper) SetStorageAndNotify(ctx sdk.Context, path, value string) {
k.LegacySetStorageAndNotify(ctx, path, value)
k.changeManager.Track(ctx, k, path, value, false)
k.SetStorage(ctx, path, value)
}

// Emit the new state change event.
ctx.EventManager().EmitEvent(
agoric.NewStateChangeEvent(
k.GetStoreName(),
k.PathToEncodedKey(path),
[]byte(value),
),
)
func (k Keeper) LegacySetStorageAndNotify(ctx sdk.Context, path, value string) {
k.changeManager.Track(ctx, k, path, value, true)
k.SetStorage(ctx, path, value)
}

func (k Keeper) AppendStorageValueAndNotify(ctx sdk.Context, path, value string) error {
Expand Down
88 changes: 88 additions & 0 deletions golang/cosmos/x/vstorage/keeper/keeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
storetypes "github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"

abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
dbm "github.com/tendermint/tm-db"
Expand Down Expand Up @@ -169,3 +170,90 @@ func TestStorage(t *testing.T) {
}
keeper.ImportStorage(ctx, got)
}

func TestStorageNotify(t *testing.T) {
tk := makeTestKit()
ctx, keeper := tk.ctx, tk.vstorageKeeper

keeper.SetStorageAndNotify(ctx, "notify.noLegacy", "noLegacyValue")
keeper.LegacySetStorageAndNotify(ctx, "notify.legacy", "legacyValue")
keeper.SetStorageAndNotify(ctx, "notify.noLegacy2", "noLegacyValue2")
keeper.SetStorageAndNotify(ctx, "notify.legacy2", "legacyValue2")
keeper.LegacySetStorageAndNotify(ctx, "notify.legacy2", "legacyValue2b")
keeper.SetStorageAndNotify(ctx, "notify.noLegacy2", "noLegacyValue2b")

// Check the batched events.
expectedBeforeFlushEvents := sdk.Events{}
if got := ctx.EventManager().Events(); !reflect.DeepEqual(got, expectedBeforeFlushEvents) {
t.Errorf("got before flush events %#v, want %#v", got, expectedBeforeFlushEvents)
}

expectedAfterFlushEvents := sdk.Events{
{
Type: "storage",
Attributes: []abci.EventAttribute{
{Key: []byte("path"), Value: []byte("notify.legacy")},
{Key: []byte("value"), Value: []byte("legacyValue")},
},
},
{
Type: "state_change",
Attributes: []abci.EventAttribute{
{Key: []byte("store"), Value: []byte("vstorage")},
{Key: []byte("key"), Value: []byte("2\x00notify\x00legacy")},
{Key: []byte("anckey"), Value: []byte("\x012\x00notify\x00legacy\x01")},
{Key: []byte("value"), Value: []byte("legacyValue")},
},
},
{
Type: "storage",
Attributes: []abci.EventAttribute{
{Key: []byte("path"), Value: []byte("notify.legacy2")},
{Key: []byte("value"), Value: []byte("legacyValue2b")},
},
},
{
Type: "state_change",
Attributes: []abci.EventAttribute{
{Key: []byte("store"), Value: []byte("vstorage")},
{Key: []byte("key"), Value: []byte("2\x00notify\x00legacy2")},
{Key: []byte("anckey"), Value: []byte("\x012\x00notify\x00legacy2\x01")},
{Key: []byte("value"), Value: []byte("legacyValue2b")},
},
},
{
Type: "state_change",
Attributes: []abci.EventAttribute{
{Key: []byte("store"), Value: []byte("vstorage")},
{Key: []byte("key"), Value: []byte("2\x00notify\x00noLegacy")},
{Key: []byte("anckey"), Value: []byte("\x012\x00notify\x00noLegacy\x01")},
{Key: []byte("value"), Value: []byte("noLegacyValue")},
},
},
{
Type: "state_change",
Attributes: []abci.EventAttribute{
{Key: []byte("store"), Value: []byte("vstorage")},
{Key: []byte("key"), Value: []byte("2\x00notify\x00noLegacy2")},
{Key: []byte("anckey"), Value: []byte("\x012\x00notify\x00noLegacy2\x01")},
{Key: []byte("value"), Value: []byte("noLegacyValue2b")},
},
},
}

keeper.FlushChangeEvents(ctx)
if got := ctx.EventManager().Events(); !reflect.DeepEqual(got, expectedAfterFlushEvents) {
for _, e := range got {
t.Logf("got event: %s", e.Type)
for _, a := range e.Attributes {
t.Logf("got attr: %s = %s", a.Key, a.Value)
}
}
t.Errorf("got after flush events %#v, want %#v", got, expectedAfterFlushEvents)
}

keeper.FlushChangeEvents(ctx)
if got := ctx.EventManager().Events(); !reflect.DeepEqual(got, expectedAfterFlushEvents) {
t.Errorf("got after second flush events %#v, want %#v", got, expectedAfterFlushEvents)
}
}
2 changes: 2 additions & 0 deletions golang/cosmos/x/vstorage/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,11 @@ func (am AppModule) RegisterServices(cfg module.Configurator) {
func (AppModule) ConsensusVersion() uint64 { return 1 }

func (am AppModule) BeginBlock(ctx sdk.Context, req abci.RequestBeginBlock) {
am.keeper.NewChangeBatch(ctx)
}

func (am AppModule) EndBlock(ctx sdk.Context, req abci.RequestEndBlock) []abci.ValidatorUpdate {
am.keeper.FlushChangeEvents(ctx)
// Prevent Cosmos SDK internal errors.
return []abci.ValidatorUpdate{}
}
Expand Down

0 comments on commit 312b608

Please sign in to comment.