Skip to content

Commit

Permalink
register cache listener later
Browse files Browse the repository at this point in the history
  • Loading branch information
scottburch committed Mar 2, 2021
1 parent 2f07ca7 commit 1b02215
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 31 deletions.
1 change: 1 addition & 0 deletions x/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func StartAggregator(aggKeeper Keeper) {
}

func run(aggKeeper Keeper) {
aggKeeper.RegisterValueUpdatedListener()
c := cron.New()
c.AddFunc("* * * * *", func() {
time.AfterFunc(40 * time.Second, func() {
Expand Down
28 changes: 2 additions & 26 deletions x/aggregator/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,23 @@ package aggregator
import (
"fmt"

sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
"github.com/bluzelle/curium/x/aggregator/keeper"
"github.com/bluzelle/curium/x/aggregator/types"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)

// NewHandler creates an sdk.Handler for all the aggregator type messages
func NewHandler(k keeper.Keeper) sdk.Handler {
return func(ctx sdk.Context, msg sdk.Msg) (*sdk.Result, error) {
ctx = ctx.WithEventManager(sdk.NewEventManager())
switch msg := msg.(type) {
// this line is used by starport scaffolding # 1
// TODO: Define your msg cases
//
//Example:
// case Msg<Action>:
// return handleMsg<Action>(ctx, k, msg)
default:
errMsg := fmt.Sprintf("unrecognized %s message type: %T", types.ModuleName, msg)
return nil, sdkerrors.Wrap(sdkerrors.ErrUnknownRequest, errMsg)
}
}
}

// handle<Action> does x
/*
func handleMsg<Action>(ctx sdk.Context, k keeper.Keeper, msg Msg<Action>) (*sdk.Result, error) {
err := k.<Action>(ctx, msg.ValidatorAddr)
if err != nil {
return nil, err
}

// TODO: Define your msg events
ctx.EventManager().EmitEvent(
sdk.NewEvent(
sdk.EventTypeMessage,
sdk.NewAttribute(sdk.AttributeKeyModule, AttributeValueCategory),
sdk.NewAttribute(sdk.AttributeKeySender, msg.ValidatorAddr.String()),
),
)

return &sdk.Result{Events: ctx.EventManager().Events()}, nil
}
*/
6 changes: 5 additions & 1 deletion x/aggregator/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ func NewKeeper(cdc *codec.Codec, oracleKeeper oracle.Keeper, valueQueueStoreKey
cdc: cdc,
// paramspace: paramspace.WithKeyTable(types.ParamKeyTable()),
}
oracleKeeper.RegisterValueUpdatedListener(keeper.SourceValueUpdatedListener)
return keeper
}

func (k Keeper) RegisterValueUpdatedListener() {
k.oracleKeeper.RegisterValueUpdatedListener(k.SourceValueUpdatedListener)
}

// Logger returns a module-specific logger.
func (k Keeper) Logger(ctx sdk.Context) log.Logger {
return ctx.Logger().With("module", fmt.Sprintf("x/%s", types.ModuleName))
Expand Down Expand Up @@ -71,6 +74,7 @@ func (k Keeper) VisitQueueItems(ctx sdk.Context, cb func(oracle.SourceValue)) {


func (k Keeper) SourceValueUpdatedListener(ctx sdk.Context, value oracle.SourceValue) {
logger.Info("Adding source value to agg queue", "batch", value.Batch, "source", value.SourceName)
k.AddQueueItem(ctx, value)
}

Expand Down
6 changes: 2 additions & 4 deletions x/oracle/keeper/keeper_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package keeper

import (
"fmt"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/bluzelle/curium/x/oracle/types"
"time"
storeIterator "github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
)

func (k Keeper) RegisterValueUpdatedListener(listener types.ValueUpdateListener) {
Expand All @@ -31,8 +30,7 @@ func (k Keeper) UpdateSourceValue(ctx sdk.Context, vote types.Vote) {
store.Set([]byte(key), k.cdc.MustMarshalBinaryBare(sourceValue))

for _, listener := range valueUpdateListeners {
// Delay the execution of the listener so it happens after the block is committed
time.AfterFunc(time.Second, func() {listener(ctx, sourceValue)})
listener(ctx, sourceValue)
}
}

Expand Down

0 comments on commit 1b02215

Please sign in to comment.