Skip to content

Commit

Permalink
#101 Calculate rank in ||, post results each 600 blocks. Handle node …
Browse files Browse the repository at this point in the history
…failure, errors check
  • Loading branch information
arturalbov authored and hleb-albau committed Dec 22, 2018
1 parent 6567ac8 commit aa14103
Show file tree
Hide file tree
Showing 13 changed files with 282 additions and 85 deletions.
104 changes: 74 additions & 30 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cybercongress/cyberd/app/types/coin"
"github.com/cybercongress/cyberd/store"
"github.com/cybercongress/cyberd/types"
"github.com/cybercongress/cyberd/util"
"github.com/cybercongress/cyberd/x/bandwidth"
bw "github.com/cybercongress/cyberd/x/bandwidth/types"
cbdbank "github.com/cybercongress/cyberd/x/bank"
Expand Down Expand Up @@ -103,15 +104,16 @@ type CyberdApp struct {
stakeIndex *cbdbank.IndexedKeeper
rankState *rank.RankState

latestRankHash []byte
latestBlockHeight int64

computeUnit rank.ComputeUnit

// TODO: move to RankState???
rankCalculationFinished bool
lastCidCount int64
rankCidCount int64

rankCalcChannel chan []float64
rankCalcChan chan []float64
rankErrChan chan error
}

// NewBasecoinApp returns a reference to a new CyberdApp given a
Expand All @@ -121,7 +123,7 @@ type CyberdApp struct {
// registered, and finally the stores being mounted along with any necessary
// chain initialization.
func NewCyberdApp(
logger log.Logger, db dbm.DB, computeUnit rank.ComputeUnit, baseAppOptions ...func(*baseapp.BaseApp),
logger log.Logger, db dbm.DB, computeUnit rank.ComputeUnit, allowSearch bool, baseAppOptions ...func(*baseapp.BaseApp),
) *CyberdApp {

// create and register app-level codec for TXs and accounts
Expand Down Expand Up @@ -193,7 +195,7 @@ func NewCyberdApp(
app.linkIndexedKeeper = keeper.NewLinkIndexedKeeper(keeper.NewBaseLinkKeeper(ms, dbKeys.links))
app.cidNumKeeper = keeper.NewBaseCidNumberKeeper(ms, dbKeys.cidNum, dbKeys.cidNumReverse)
app.stakeIndex = cbdbank.NewIndexedKeeper(&app.bankKeeper, app.accountKeeper)
app.rankState = rank.NewRankState(&app.linkIndexedKeeper)
app.rankState = rank.NewRankState(&app.linkIndexedKeeper, allowSearch)

// register the staking hooks
// NOTE: stakeKeeper above are passed by reference,
Expand Down Expand Up @@ -233,21 +235,44 @@ func NewCyberdApp(
if err != nil {
cmn.Exit(err.Error())
}

ctx := app.BaseApp.NewContext(true, abci.Header{})
app.latestBlockHeight = int64(ms.GetLatestBlockNumber(ctx))

// build context for current rank calculation round
rankRoundBlockNumber := (app.latestBlockHeight / rank.CalculationPeriod) * rank.CalculationPeriod
// todo: we need to pass all keys cause under the hood it checks keys by commited state :(
rankCtx, err := util.NewContextWithMSVersion(
db, rankRoundBlockNumber,
dbKeys.main, dbKeys.acc, dbKeys.cidNum, dbKeys.cidNumReverse, dbKeys.links, dbKeys.rank, dbKeys.stake,
dbKeys.slashing, dbKeys.params, dbKeys.distr, dbKeys.fees, dbKeys.accBandwidth,
)

// load in-memory data
start := time.Now()
app.BaseApp.Logger.Info("Loading mem state")
app.linkIndexedKeeper.Load(ctx)
app.stakeIndex.Load(ctx)
app.linkIndexedKeeper.Load(rankCtx, ctx)
app.stakeIndex.Load(rankCtx, ctx)
app.BaseApp.Logger.Info("App loaded", "time", time.Since(start))
app.latestRankHash = ms.GetAppHash(ctx)

// BANDWIDTH PARAMS
app.lastTotalSpentBandwidth = ms.GetSpentBandwidth(ctx)
app.currentCreditPrice = math.Float64frombits(ms.GetBandwidthPrice(ctx, bandwidth.BaseCreditPrice))
app.curBlockSpentBandwidth = 0
app.latestBlockHeight = int64(ms.GetLatestBlockNumber(ctx))

// RANK PARAMS
app.rankState.Load(ctx, app.mainKeeper)
app.rankCalculationFinished = true
app.rankCalcChannel = make(chan []float64, 1)
app.rankCalcChan = make(chan []float64, 1)
app.rankCidCount = int64(app.mainKeeper.GetCidsCount(ctx))
app.rankErrChan = make(chan error)

// if we have fallen and need to start new rank calculation
// todo: what if rank didn't changed in new calculation???
if app.latestBlockHeight != 0 && !app.rankState.NextRankReady() {
app.startRankCalculation(ctx)
app.rankCalculationFinished = false
}

app.Seal()
return app
Expand Down Expand Up @@ -489,42 +514,50 @@ func (app *CyberdApp) EndBlocker(ctx sdk.Context, _ abci.RequestEndBlock) abci.R
//linksCount := app.mainKeeper.GetLinksCount(ctx)

//start := time.Now()
// TODO: Deal with hashes (new, old)
// TODO: Run reindexation in parallel
if ctx.BlockHeight()%rank.CalculationPeriod == 0 || ctx.BlockHeight() == 1 {

if !app.rankCalculationFinished {
newRank := <-app.rankCalcChannel
app.rankState.SetNewRank(newRank)
app.rankCalculationFinished = true
select {
case newRank := <-app.rankCalcChan:
app.handleNewRank(ctx, newRank)
case err := <-app.rankErrChan:
// DUMB ERROR HANDLING
app.BaseApp.Logger.Error("Error during rank calculation " + err.Error())
panic(err.Error())
}
}

app.rankState.ApplyNewRank(app.lastCidCount)
app.linkIndexedKeeper.MergeNewLinks()
app.rankState.ApplyNextRank(app.rankCidCount)
// Recalculate index
go app.rankState.BuildCidRankedLinksIndexInParallel(app.rankCidCount)

rankHash := app.rankState.CalculateHash()
app.latestRankHash = rankHash
rankHash := app.rankState.GetCurrentRankHash()
app.mainKeeper.StoreAppHash(ctx, rankHash)

app.rankCalculationFinished = false

app.lastCidCount = int64(app.mainKeeper.GetCidsCount(ctx))
calcCtx := rank.NewCalcContext(ctx, app.linkIndexedKeeper, app.cidNumKeeper, app.stakeIndex)
go rank.CalculateRank(calcCtx, app.rankCalcChannel, app.computeUnit, app.BaseApp.Logger)
app.rankCidCount = int64(app.mainKeeper.GetCidsCount(ctx))
app.linkIndexedKeeper.FixLinks() //todo: synchronize with index recalculation. copy out links
app.stakeIndex.FixUserStake()
app.startRankCalculation(ctx)

} else if !app.rankCalculationFinished {

select {
case newRank := <-app.rankCalcChannel:
app.rankState.SetNewRank(newRank)
app.rankCalculationFinished = true
case newRank := <-app.rankCalcChan:
app.handleNewRank(ctx, newRank)
app.BaseApp.Logger.Info("Rank calculation finished!")
case err := <-app.rankErrChan:
// DUMB ERROR HANDLING
app.BaseApp.Logger.Error("Error during rank calculation " + err.Error())
panic(err.Error())
default:
}

}
//app.BaseApp.Logger.Info(
// "Rank calculated", "steps", steps, "time", time.Since(start), "links", linksCount, "cids", cidsCount,
//)

//CHECK INDEX BUILDING FOR ERROR:
app.rankState.CheckBuildIndexError(app.BaseApp.Logger)

// END RANK CALCULATION

Expand All @@ -534,11 +567,22 @@ func (app *CyberdApp) EndBlocker(ctx sdk.Context, _ abci.RequestEndBlock) abci.R
}
}

func (app *CyberdApp) startRankCalculation(ctx sdk.Context) {
calcCtx := rank.NewCalcContext(ctx, app.linkIndexedKeeper, app.cidNumKeeper, app.stakeIndex)
go rank.CalculateRankInParallel(calcCtx, app.rankCalcChan, app.rankErrChan, app.computeUnit, app.BaseApp.Logger)
}

func (app *CyberdApp) handleNewRank(ctx sdk.Context, newRank []float64) {
app.rankState.SetNextRank(newRank)
app.mainKeeper.StoreNextAppHash(ctx, app.rankState.GetNextRankHash())
app.rankCalculationFinished = true
}

// Implements ABCI
func (app *CyberdApp) Commit() (res abci.ResponseCommit) {

app.BaseApp.Commit()
return abci.ResponseCommit{Data: app.latestRankHash}
return abci.ResponseCommit{Data: app.rankState.GetCurrentRankHash()}
}

// Implements ABCI
Expand All @@ -556,5 +600,5 @@ func (app *CyberdApp) appHash() []byte {
if app.LastBlockHeight() == 0 {
return make([]byte, 0)
}
return app.latestRankHash
return app.rankState.GetCurrentRankHash()
}
8 changes: 5 additions & 3 deletions daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
)

const (
flagGpuEnabled = "compute-rank-on-gpu"
flagGpuEnabled = "compute-rank-on-gpu"
flagSearchRpcQueryEnabled = "allow-search-rpc-query"
)

func main() {
Expand All @@ -48,6 +49,7 @@ func main() {
for _, c := range rootCmd.Commands() {
if c.Use == "start" {
c.Flags().Bool(flagGpuEnabled, true, "Run cyberd with cuda calculations")
c.Flags().Bool(flagSearchRpcQueryEnabled, true, "Build index of links with ranks and allow to query search through RPC")
}
}

Expand All @@ -68,7 +70,7 @@ func newApp(logger log.Logger, db dbm.DB, storeTracer io.Writer) abci.Applicatio
if viper.GetBool(flagGpuEnabled) {
computeUnit = rank.GPU
}
cyberdApp := app.NewCyberdApp(logger, db, computeUnit, pruning)
cyberdApp := app.NewCyberdApp(logger, db, computeUnit, viper.GetBool(flagSearchRpcQueryEnabled), pruning)
rpc.SetCyberdApp(cyberdApp)
return cyberdApp
}
Expand All @@ -77,6 +79,6 @@ func exportAppStateAndTMValidators(
logger log.Logger, db dbm.DB, traceStore io.Writer, height int64, forZeroHeight bool,
) (json.RawMessage, []tmtypes.GenesisValidator, error) {

capp := app.NewCyberdApp(logger, db, rank.CPU)
capp := app.NewCyberdApp(logger, db, rank.CPU, false)
return capp.ExportAppStateAndValidators()
}
12 changes: 11 additions & 1 deletion docs/rank.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,14 @@ Node doesn't participate in consensus till rank not calculated (obvious).
We probably don't care about indices of new cids and user stakes cause they are ordered by numbers
and in rank calculation algorithm no needed numbers will be ignored (need to check).

We could store cidCount per rank calculation.
We could store cidCount per rank calculation.

#### New links handling

Add new incoming links to index with rank 0, so they immediately can be seen by users.
Include those "zero" ranks to consensus hash.

Problems:
1. Reload of app state after failure (dealing with hashes)
2. Recalculation of index. New links will go to next rank calculation and old index (with those links)
will be replaced by currently calculated rank.
13 changes: 12 additions & 1 deletion store/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
var lastCidNumberKey = []byte("cyberd_last_cid_number")
var linksCountKey = []byte("cyberd_links_count")
var lastAppHashKey = []byte("cyberd_app_hash")
var nextAppHashKey = []byte("cyberd_next_app_hash")
var genesisSupplyKey = []byte("cyberd_genesis_supply")
var lastBandwidthPrice = []byte("cyberd_last_bandwidth_price")
var spentBandwidth = []byte("cyberd_spent_bandwidth")
Expand Down Expand Up @@ -84,6 +85,17 @@ func (ms MainKeeper) StoreAppHash(ctx sdk.Context, hash []byte) {
store.Set(lastAppHashKey, hash)
}

func (ms MainKeeper) GetNextAppHash(ctx sdk.Context) []byte {
store := ctx.KVStore(ms.key)
return store.Get(nextAppHashKey)
}

func (ms MainKeeper) StoreNextAppHash(ctx sdk.Context, hash []byte) {

store := ctx.KVStore(ms.key)
store.Set(nextAppHashKey, hash)
}

func (ms MainKeeper) GetBandwidthPrice(ctx sdk.Context, basePrice float64) uint64 {
store := ctx.KVStore(ms.key)
priceAsBytes := store.Get(lastBandwidthPrice)
Expand All @@ -101,7 +113,6 @@ func (ms MainKeeper) StoreBandwidthPrice(ctx sdk.Context, price uint64) {
store.Set(lastBandwidthPrice, priceAsBytes)
}


func (ms MainKeeper) GetSpentBandwidth(ctx sdk.Context) uint64 {
store := ctx.KVStore(ms.key)
bandwidthAsBytes := store.Get(spentBandwidth)
Expand Down
25 changes: 25 additions & 0 deletions util/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package util

import (
"github.com/cosmos/cosmos-sdk/store"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/db"
)

func NewContextWithMSVersion(db db.DB, version int64, keys ...*sdk.KVStoreKey) (sdk.Context, error) {

ms := store.NewCommitMultiStore(db)

for _, key := range keys {
ms.MountStoreWithDB(key, sdk.StoreTypeIAVL, nil)
}

err := ms.LoadVersion(version)

if err != nil {
return sdk.Context{}, err
}

return sdk.NewContext(ms, types.Header{Height: version}, false, nil), nil
}
27 changes: 17 additions & 10 deletions x/bank/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ type IndexedKeeper struct {

accKeeper auth.AccountKeeper

userTotalStake map[cbd.AccNumber]uint64
userTotalStake map[cbd.AccNumber]uint64
userNewTotalStake map[cbd.AccNumber]uint64

// used to track accs with changed stake
accsToUpdate []sdk.AccAddress
Expand All @@ -36,25 +37,31 @@ func NewIndexedKeeper(keeper *Keeper, accKeeper auth.AccountKeeper) *IndexedKeep
return &index
}

func (s *IndexedKeeper) Empty() {
s.userTotalStake = make(map[cbd.AccNumber]uint64)
}

func (s *IndexedKeeper) Load(ctx sdk.Context) {
func (s *IndexedKeeper) Load(rankCtx sdk.Context, freshCtx sdk.Context) {

s.userTotalStake = make(map[cbd.AccNumber]uint64)
s.accKeeper.IterateAccounts(rankCtx, s.getCollectFunc(rankCtx, s.userTotalStake))

collect := func(acc auth.Account) bool {
s.userNewTotalStake = make(map[cbd.AccNumber]uint64)
s.accKeeper.IterateAccounts(freshCtx, s.getCollectFunc(freshCtx, s.userNewTotalStake))
}

func (s *IndexedKeeper) getCollectFunc(ctx sdk.Context, userStake map[cbd.AccNumber]uint64) func(acc auth.Account) bool {
return func(acc auth.Account) bool {
balance := s.Keeper.GetAccountTotalStake(ctx, acc.GetAddress())
s.userTotalStake[cbd.AccNumber(acc.GetAccountNumber())] = uint64(balance)
userStake[cbd.AccNumber(acc.GetAccountNumber())] = uint64(balance)
return false
}
}

s.accKeeper.IterateAccounts(ctx, collect)
func (s *IndexedKeeper) FixUserStake() {
for k, v := range s.userNewTotalStake {
s.userTotalStake[k] = v
}
}

func (s *IndexedKeeper) UpdateStake(acc cbd.AccNumber, stake int64) {
s.userTotalStake[acc] += uint64(stake)
s.userNewTotalStake[acc] += uint64(stake)
}

func (s *IndexedKeeper) GetTotalStakes() map[cbd.AccNumber]uint64 {
Expand Down

0 comments on commit aa14103

Please sign in to comment.