Skip to content

Commit

Permalink
#101 Copy state before index creation
Browse files Browse the repository at this point in the history
  • Loading branch information
hleb-albau authored and arturalbov committed Dec 26, 2018
1 parent 0de1188 commit 1243919
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 27 deletions.
8 changes: 5 additions & 3 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
cbdbank "github.com/cybercongress/cyberd/x/bank"
"github.com/cybercongress/cyberd/x/link"
"github.com/cybercongress/cyberd/x/link/keeper"
lnk "github.com/cybercongress/cyberd/x/link/types"
"github.com/cybercongress/cyberd/x/mint"
"github.com/cybercongress/cyberd/x/rank"
abci "github.com/tendermint/tendermint/abci/types"
Expand Down Expand Up @@ -162,7 +163,7 @@ func NewCyberdApp(
app.linkIndexedKeeper = keeper.NewLinkIndexedKeeper(keeper.NewBaseLinkKeeper(ms, dbKeys.links))
app.cidNumKeeper = keeper.NewBaseCidNumberKeeper(ms, dbKeys.cidNum, dbKeys.cidNumReverse)
app.stakeIndex = cbdbank.NewIndexedKeeper(&app.bankKeeper, app.accountKeeper)
app.rankState = rank.NewRankState(&app.linkIndexedKeeper, allowSearch)
app.rankState = rank.NewRankState(allowSearch)

// register the staking hooks
// NOTE: stakeKeeper above are passed by reference,
Expand Down Expand Up @@ -484,7 +485,9 @@ func (app *CyberdApp) EndBlocker(ctx sdk.Context, _ abci.RequestEndBlock) abci.R

app.rankState.ApplyNextRank(app.rankCidCount)
// Recalculate index
go app.rankState.BuildCidRankedLinksIndexInParallel(app.rankCidCount)
// todo state copied
outLinksCopy := lnk.Links(app.linkIndexedKeeper.GetOutLinks()).Copy()
go app.rankState.BuildCidRankedLinksIndexInParallel(app.rankCidCount, outLinksCopy)

rankHash := app.rankState.GetCurrentRankHash()
app.mainKeeper.StoreAppHash(ctx, rankHash)
Expand All @@ -501,7 +504,6 @@ func (app *CyberdApp) EndBlocker(ctx sdk.Context, _ abci.RequestEndBlock) abci.R
select {
case newRank := <-app.rankCalcChan:
app.handleNewRank(ctx, newRank)
app.BaseApp.Logger.Info("Rank calculation finished!")
case err := <-app.rankErrChan:
// DUMB ERROR HANDLING
app.BaseApp.Logger.Error("Error during rank calculation " + err.Error())
Expand Down
11 changes: 3 additions & 8 deletions x/link/keeper/link_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,9 @@ func (i *LinkIndexedKeeper) Load(rankCtx sdk.Context, freshCtx sdk.Context) {
}

func (i *LinkIndexedKeeper) FixLinks() {

for k, v := range i.newInLinks {
i.inLinks[k] = v
}

for k, v := range i.newOutLinks {
i.outLinks[k] = v
}
// todo state copied
i.inLinks = Links(i.newInLinks).Copy()
i.outLinks = Links(i.newOutLinks).Copy()
}

func (i *LinkIndexedKeeper) PutIntoIndex(link Link) {
Expand Down
18 changes: 18 additions & 0 deletions x/link/types/cid.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,21 @@ func (links Links) Put(from CidNumber, to CidNumber, acc AccNumber) {
cidLinks[to] = users
links[from] = cidLinks
}

func (links Links) Copy() Links {

linksCopy := make(Links, len(links))

for from := range links {
fromLinks := make(CidLinks, len(links[from]))
for to := range links[from] {
users := make(map[AccNumber]struct{}, len(links[from][to]))
for u := range links[from][to] {
users[u] = struct{}{}
}
fromLinks[to] = users
}
linksCopy[from] = fromLinks
}
return linksCopy
}
13 changes: 10 additions & 3 deletions x/rank/calculate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rank

import (
"github.com/tendermint/tendermint/libs/log"
"time"
)

type ComputeUnit int
Expand All @@ -11,13 +12,19 @@ const (
GPU ComputeUnit = iota
)

func CalculateRank(ctx *CalculationContext, unit ComputeUnit, logger log.Logger) []float64 {
func CalculateRank(ctx *CalculationContext, unit ComputeUnit, logger log.Logger) (rank []float64) {
start := time.Now()
if unit == CPU {
//used only for development
return calculateRankCPU(ctx)
rank = calculateRankCPU(ctx)

} else {
return calculateRankGPU(ctx, logger)
rank = calculateRankGPU(ctx, logger)
}
logger.Info(
"Rank calculated", "time", time.Since(start), "links", len(ctx.outLinks), "cids", ctx.cidsCount,
)
return
}

func CalculateRankInParallel(
Expand Down
22 changes: 9 additions & 13 deletions x/rank/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"errors"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cybercongress/cyberd/store"
"github.com/cybercongress/cyberd/x/link/keeper"
. "github.com/cybercongress/cyberd/x/link/types"
"github.com/tendermint/tendermint/libs/log"
"math"
Expand All @@ -23,8 +22,6 @@ func (c RankedCidNumber) GetNumber() CidNumber { return c.number }
func (c RankedCidNumber) GetRank() float64 { return c.rank }

type RankState struct {
linkIndex *keeper.LinkIndexedKeeper

cidRankedLinksIndex []cidRankedLinks
networkCidRank []float64 // array index is cid number
nextCidRank []float64 // array index is cid number
Expand All @@ -36,10 +33,9 @@ type RankState struct {
indexErrChan chan error
}

func NewRankState(linkIndex *keeper.LinkIndexedKeeper, allowSearch bool) *RankState {

func NewRankState(allowSearch bool) *RankState {
return &RankState{
linkIndex: linkIndex, allowSearch: allowSearch, indexErrChan: make(chan error),
allowSearch: allowSearch, indexErrChan: make(chan error),
}
}

Expand Down Expand Up @@ -107,16 +103,16 @@ func (s *RankState) ApplyNextRank(cidsCount int64) {
s.nextCidRank = nil
}

func (s *RankState) BuildCidRankedLinksIndex(cidsCount int64) {
func (s *RankState) BuildCidRankedLinksIndex(cidsCount int64, outLinks Links) {
// If search on this node is not allowed then we don't need to build index
if !s.allowSearch {
if !s.allowSearch || s.networkCidRank == nil {
return
}

newIndex := make([]cidRankedLinks, cidsCount)

for cidNumber := CidNumber(0); cidNumber < CidNumber(cidsCount); cidNumber++ {
cidOutLinks := s.linkIndex.GetOutLinks()[cidNumber]
cidOutLinks := outLinks[cidNumber]
cidSortedByRankLinkedCids := s.getLinksSortedByRank(cidOutLinks)
newIndex[cidNumber] = cidSortedByRankLinkedCids
}
Expand All @@ -125,23 +121,23 @@ func (s *RankState) BuildCidRankedLinksIndex(cidsCount int64) {
}

// Used for building index in parallel
func (s *RankState) BuildCidRankedLinksIndexInParallel(cidsCount int64) {
func (s *RankState) BuildCidRankedLinksIndexInParallel(cidsCount int64, outLinks Links) {
defer func() {
if r := recover(); r != nil {
s.indexErrChan <- r.(error)
}
}()

s.BuildCidRankedLinksIndex(cidsCount)
s.BuildCidRankedLinksIndex(cidsCount, outLinks)
}

func (s *RankState) CheckBuildIndexError(logger log.Logger) {
if s.allowSearch {
select {
case err := <- s.indexErrChan:
case err := <-s.indexErrChan:
// DUMB ERROR HANDLING
logger.Error("Error during building rank index " + err.Error())
panic(err.Error())
panic(err)
default:
}
}
Expand Down

0 comments on commit 1243919

Please sign in to comment.