Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync watcher #814

Merged
merged 11 commits into from
Nov 30, 2018
23 changes: 23 additions & 0 deletions cmd/sebak/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ var (
flagTransactionsLimit string = common.GetENVValue("SEBAK_TRANSACTIONS_LIMIT", strconv.Itoa(common.DefaultTransactionsInBallotLimit))
flagOperationsInBallotLimit string = common.GetENVValue("SEBAK_OPERATIONS_IN_BALLOT_LIMIT", strconv.Itoa(common.DefaultOperationsInBallotLimit))
flagTxPoolLimit string = common.GetENVValue("SEBAK_TX_POOL_LIMIT", strconv.Itoa(common.DefaultTxPoolLimit))

flagWatcherMode bool = common.GetENVValue("SEBAK_WATCHER_MODE", "0") == "1"
flagWatchInterval string = common.GetENVValue("SEBAK_WATCH_INTERVAL", "5s")
)

var (
Expand Down Expand Up @@ -112,6 +115,7 @@ var (
txPoolNodeLimit uint64
syncCheckPrevBlock time.Duration
jsonrpcbindEndpoint *common.Endpoint
watchInterval time.Duration

logLevel logging.Lvl
log logging.Logger = logging.New("module", "main")
Expand Down Expand Up @@ -220,6 +224,8 @@ func init() {
nodeCmd.Flags().StringVar(&flagHTTPCacheRedisAddrs, "http-cache-redis-addrs", flagHTTPCacheRedisAddrs, "http cache redis address")

nodeCmd.Flags().StringVar(&flagCongressAddress, "set-congress-address", flagCongressAddress, "set congress address")
nodeCmd.Flags().BoolVar(&flagWatcherMode, "watcher-mode", flagWatcherMode, "watcher mode")
nodeCmd.Flags().StringVar(&flagWatchInterval, "watch-interval", flagWatchInterval, "watch interval")

rootCmd.AddCommand(nodeCmd)
}
Expand Down Expand Up @@ -444,6 +450,7 @@ func parseFlagsNode() {
syncFetchTimeout = getTimeDuration(flagSyncFetchTimeout, sync.FetchTimeout, "--sync-fetch-timeout")
syncCheckInterval = getTimeDuration(flagSyncCheckInterval, sync.CheckBlockHeightInterval, "--sync-check-interval")
syncCheckPrevBlock = getTimeDuration(flagSyncCheckPrevBlockInterval, sync.CheckPrevBlockInterval, "--sync-check-prevblock")
watchInterval = getTimeDuration(flagWatchInterval, sync.WatchInterval, "--watch-interval")

{
if ok := common.HTTPCacheAdapterNames[flagHTTPCacheAdapter]; !ok {
Expand Down Expand Up @@ -570,6 +577,10 @@ func parseFlagsNode() {
parsedFlags = append(parsedFlags, "\n\thttp-cache-adapter", httpCacheAdapter)
parsedFlags = append(parsedFlags, "\n\thttp-cache-pool-size", httpCachePoolSize)

if flagWatcherMode {
parsedFlags = append(parsedFlags, "\n\twatcher-mode", flagWatcherMode)
}

// create current Node
localNode, err = node.NewLocalNode(kp, bindEndpoint, "")
if err != nil {
Expand Down Expand Up @@ -681,6 +692,7 @@ func runNode() error {
TxPoolClientLimit: int(txPoolClientLimit),
TxPoolNodeLimit: int(txPoolNodeLimit),
JSONRPCEndpoint: jsonrpcbindEndpoint,
WatcherMode: flagWatcherMode,
}
st, err := storage.NewStorage(storageConfig)
if err != nil {
Expand All @@ -696,6 +708,7 @@ func runNode() error {
c.RetryInterval = syncRetryInterval
c.CheckBlockHeightInterval = syncCheckInterval
c.CheckPrevBlockInterval = syncCheckPrevBlock
c.WatchInterval = watchInterval

syncer := c.NewSyncer()

Expand Down Expand Up @@ -732,6 +745,16 @@ func runNode() error {
syncer.Stop()
})
}
{
if flagWatcherMode == true {
watcher := c.NewWatcher(syncer)
g.Add(func() error {
return watcher.Start()
}, func(error) {
watcher.Stop()
})
}
}
{
cancel := make(chan struct{})
g.Add(func() error {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ require (
github.com/ulule/limiter v2.2.0+incompatible
github.com/vmihailenco/msgpack v4.0.1+incompatible
golang.org/x/net v0.0.0-20180420171651-5f9ae10d9af5
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f // indirect
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f
golang.org/x/sys v0.0.0-20180501092740-78d5f264b493 // indirect
golang.org/x/text v0.3.0 // indirect
gopkg.in/fsnotify.v1 v1.4.7 // indirect
Expand Down
2 changes: 2 additions & 0 deletions lib/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type Config struct {
CommonAccountAddress string

JSONRPCEndpoint *Endpoint

WatcherMode bool
}

func NewConfig(networkID []byte) Config {
Expand Down
1 change: 1 addition & 0 deletions lib/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,5 @@ var (
TransactionAlreadyExistsInPool = NewError(189, "transaction already exists in pool")
BadRequestParameter = NewError(190, "request parameter is invalid")
BallotHasOverMaxOperationsInBallot = NewError(191, "too many operations in ballot")
AllValidatorsNotConnected = NewError(192, "all validators are not connected")
)
4 changes: 4 additions & 0 deletions lib/node/local_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func (n *LocalNode) SetSync() {
n.state = StateSYNC
}

func (n *LocalNode) SetWatch() {
n.state = StateWATCH
}

func (n *LocalNode) Address() string {
return n.keypair.Address()
}
Expand Down
5 changes: 5 additions & 0 deletions lib/node/node_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const (
StateBOOTING State = iota
StateCONSENSUS
StateSYNC
StateWATCH
)

func (s State) String() string {
Expand All @@ -20,6 +21,8 @@ func (s State) String() string {
return "CONSENSUS"
case 2:
return "SYNC"
case 3:
return "WATCH"
}

return ""
Expand All @@ -38,6 +41,8 @@ func (s *State) UnmarshalJSON(b []byte) (err error) {
c = 1
case "SYNC":
c = 2
case "WATCH":
c = 3
}

*s = State(c)
Expand Down
8 changes: 8 additions & 0 deletions lib/node/runner/api_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ var HandleTransactionCheckerFuncsWithoutBroadcast = []common.CheckerFunc{
PushIntoTransactionPoolFromNode,
}

var HandleTransactionCheckerForWatcherFuncs = []common.CheckerFunc{
TransactionUnmarshal,
HasTransaction,
MessageHasSameSource,
MessageValidate,
BroadcastTransactionFromWatcher,
}

func (api NetworkHandlerNode) ReceiveTransaction(body []byte, funcs []common.CheckerFunc) (transaction.Transaction, error) {
message := common.NetworkMessage{Type: common.TransactionMessage, Data: body}
checker := &MessageChecker{
Expand Down
44 changes: 44 additions & 0 deletions lib/node/runner/checker_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
package runner

import (
"math/rand"

logging "github.com/inconshreveable/log15"

"encoding/json"
Expand Down Expand Up @@ -174,3 +176,45 @@ func BroadcastTransaction(c common.Checker, args ...interface{}) (err error) {

return
}

// BroadcastTransactionFromWatcher is sending tx to one of validators.
// If all validators returns error, it returns error.
func BroadcastTransactionFromWatcher(c common.Checker, args ...interface{}) error {
checker := c.(*MessageChecker)
if checker.Conf.WatcherMode == false {
return nil
}
checker.Log.Debug("transaction from client will be sent")

cm := checker.Consensus.ConnectionManager()
var addrs []string

for _, a := range cm.AllConnected() {
if a != checker.LocalNode.Address() {
addrs = append(addrs, a)
}
}

if len(addrs) <= 0 {
return errors.AllValidatorsNotConnected
}

raddrs := make([]string, len(addrs))
perm := rand.Perm(len(addrs))
for i, v := range perm {
raddrs[v] = addrs[i]
}

var err error
for _, a := range raddrs {
client := cm.GetConnection(a)
_, err = client.SendTransaction(checker.Transaction)
if err == nil {
// Broaadcast from watcher is that send one of them using client api successfully.
checker.Log.Info("send tx to node", "node", a, "tx", checker.Transaction.GetHash())
break
}
checker.Log.Debug("failure to send tx to node", "node", a, "err", err, "tx", checker.Transaction.GetHash())
}
return err
anarcher marked this conversation as resolved.
Show resolved Hide resolved
}
12 changes: 11 additions & 1 deletion lib/node/runner/node_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,16 @@ func (nr *NodeRunner) Ready() {

TransactionsHandler := func(w http.ResponseWriter, r *http.Request) {
if r.Method == "POST" {

checkerFuncs := HandleTransactionCheckerFuncs

if nr.Conf.WatcherMode == true {
checkerFuncs = HandleTransactionCheckerForWatcherFuncs
}

apiHandler.PostTransactionsHandler(
w, r,
nodeHandler.ReceiveTransaction, HandleTransactionCheckerFuncs,
nodeHandler.ReceiveTransaction, checkerFuncs,
)
return
}
Expand Down Expand Up @@ -572,6 +579,9 @@ func (nr *NodeRunner) handleBallotMessage(message common.NetworkMessage) (err er
}

func (nr *NodeRunner) InitRound() {
if nr.Conf.WatcherMode == true {
return
}
// get latest blocks
nr.consensus.SetLatestVotingBasis(voting.Basis{})

Expand Down
19 changes: 19 additions & 0 deletions lib/sync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
RetryInterval = 10 * time.Second
CheckBlockHeightInterval = 30 * time.Second
CheckPrevBlockInterval = 30 * time.Second
WatchInterval = 5 * time.Second
)

type Config struct {
Expand All @@ -34,6 +35,7 @@ type Config struct {
RetryInterval time.Duration
CheckBlockHeightInterval time.Duration
CheckPrevBlockInterval time.Duration
WatchInterval time.Duration
}

func NewConfig(localNode *node.LocalNode,
Expand Down Expand Up @@ -103,6 +105,23 @@ func (c *Config) NewValidator() Validator {
return v
}

func (c *Config) NewWatcher(s SyncController) *Watcher {
c.logger.Info("watcher config", "watchInterval", c.WatchInterval)

client := c.NewHTTP2Client()
w := NewWatcher(
s, client,
c.connectionManager,
c.storage,
c.localNode,
func(w *Watcher) {
w.interval = c.WatchInterval
},
)
w.SetLogger(c.logger.New("submodule", "watcher"))
return w
}

func (c *Config) NewHTTP2Client() *common.HTTP2Client {
client, err := common.NewHTTP2Client(c.FetchTimeout, 0, true)
if err != nil {
Expand Down
Loading