Skip to content

Commit

Permalink
changed aggregator to be more deterministic
Browse files Browse the repository at this point in the history
  • Loading branch information
scottburch committed Mar 6, 2021
1 parent 24017cc commit 0fc1669
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 8 deletions.
Binary file modified docs/oracle/Diagram1.dia
Binary file not shown.
5 changes: 1 addition & 4 deletions x/aggregator/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@ package aggregator
import (
"github.com/bluzelle/curium/x/aggregator/keeper"
sdk "github.com/cosmos/cosmos-sdk/types"
"time"
)


func EndBlocker(ctx sdk.Context, k keeper.Keeper) {
if time.Now().Second() > 40 {
k.AggregateValues(ctx)
}
k.AggregateValues(ctx)
}
29 changes: 25 additions & 4 deletions x/aggregator/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type AggregatorValue struct {
InSymbol string
Value sdk.Dec
Count int64
Height int64
}

var logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout))
Expand Down Expand Up @@ -60,7 +61,7 @@ func (k Keeper) GetAggValueStore(ctx sdk.Context) sdk.KVStore {
}

func MakeQueueItemKey(value AggregatorQueueItem) []byte {
return []byte(value.Batch + ">" + value.SourceName)
return []byte(BlockNumberToString(value.Height) + ">" + value.Batch + ">" + value.SourceName)
}

type AggregatorQueueItem struct {
Expand All @@ -69,6 +70,7 @@ type AggregatorQueueItem struct {
Symbol string
InSymbol string
Value sdk.Dec
Height int64
}

func (k Keeper) AddQueueItem(ctx sdk.Context, value oracle.SourceValue) {
Expand All @@ -79,6 +81,7 @@ func (k Keeper) AddQueueItem(ctx sdk.Context, value oracle.SourceValue) {
Symbol: parts[1],
InSymbol: parts[3],
Value: value.Value,
Height: value.Height,
}
key := MakeQueueItemKey(aggQueueItem)
store := k.GetQueueStore(ctx)
Expand Down Expand Up @@ -109,14 +112,28 @@ func (k Keeper) SourceValueUpdatedListener(ctx sdk.Context, value oracle.SourceV
k.AddQueueItem(ctx, value)
}

func (k Keeper) isQueueReadyForProcessing(ctx sdk.Context) bool {
const BLOCKS_TO_WAIT = 3
store := k.GetQueueStore(ctx)
iterator := store.ReverseIterator(nil, nil)
defer iterator.Close()
if iterator.Valid() {
var item AggregatorQueueItem
k.cdc.MustUnmarshalBinaryBare(iterator.Value(), &item)
return item.Height + BLOCKS_TO_WAIT < ctx.BlockHeight()
}
return false
}

func (k Keeper) AggregateValues(ctx sdk.Context) {
logger.Info("Start aggregate values")
if !k.isQueueReadyForProcessing(ctx) {
return
}
batches := batchQueueItems(ctx, k)
for batch := range batches {
logger.Info("Processing batch", "batch", batch, "len", len(batches[batch]))
logger.Info("Aggregator processing batch", "batch", batch, "len", len(batches[batch]))
processBatch(ctx, k, batches[batch])
}
logger.Info("End aggregate values")
}

func filterZeroValues(values []AggregatorQueueItem) []AggregatorQueueItem {
Expand Down Expand Up @@ -189,6 +206,10 @@ func fixupUsdItems(values []AggregatorQueueItem) {
}
}

func BlockNumberToString(blockNum int64) string {
return fmt.Sprintf("%020d", blockNum)
}

func batchQueueItems(ctx sdk.Context, k Keeper) map[string][]AggregatorQueueItem {
var batches = map[string][]AggregatorQueueItem{}
k.VisitQueueItems(ctx, func(value AggregatorQueueItem) {
Expand Down
1 change: 1 addition & 0 deletions x/oracle/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func handleMsgOracleVote(ctx sdk.Context, k keeper.Keeper, msg types.MsgOracleVo
Valcons: msg.Valcons,
Owner: msg.Owner,
Weight: weight,
Height: ctx.BlockHeight(),
}
k.UpdateSourceValue(ctx, vote)
k.StoreVote(ctx, vote)
Expand Down
1 change: 1 addition & 0 deletions x/oracle/keeper/keeper_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func (k Keeper) UpdateSourceValue(ctx sdk.Context, vote types.Vote) {
Batch: vote.Batch,
Value: average,
Owner: vote.Owner,
Height: ctx.BlockHeight(),
}
store.Set([]byte(key), k.cdc.MustMarshalBinaryBare(sourceValue))

Expand Down
2 changes: 2 additions & 0 deletions x/oracle/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ type Vote struct {
Valcons string
Owner types.AccAddress
Weight types.Dec
Height int64
}

type SourceValue struct {
SourceName string
Batch string
Value types.Dec
Owner types.AccAddress
Height int64
}

type OracleConfig struct {
Expand Down

0 comments on commit 0fc1669

Please sign in to comment.