Skip to content

Commit

Permalink
Add Raft mode.
Browse files Browse the repository at this point in the history
  • Loading branch information
joelburget committed Aug 24, 2017
1 parent 2620c98 commit 7e94f76
Show file tree
Hide file tree
Showing 372 changed files with 49,655 additions and 2,832 deletions.
45 changes: 44 additions & 1 deletion cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ import (
"github.com/ethereum/go-ethereum/contracts/release"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/params"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
"github.com/ethereum/go-ethereum/raft"
"github.com/naoina/toml"
"time"
)

var (
Expand Down Expand Up @@ -151,7 +154,47 @@ func enableWhisper(ctx *cli.Context) bool {
func makeFullNode(ctx *cli.Context) *node.Node {
stack, cfg := makeConfigNode(ctx)

utils.RegisterEthService(stack, &cfg.Eth)
ethChan := utils.RegisterEthService(stack, &cfg.Eth)

if ctx.GlobalBool(utils.RaftModeFlag.Name) {
blockTimeMillis := ctx.GlobalInt(utils.RaftBlockTimeFlag.Name)
datadir := ctx.GlobalString(utils.DataDirFlag.Name)
joinExistingId := ctx.GlobalInt(utils.RaftJoinExistingFlag.Name)

if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
privkey := cfg.Node.NodeKey()
strId := discover.PubkeyID(&privkey.PublicKey).String()
blockTimeNanos := time.Duration(blockTimeMillis) * time.Millisecond
peers := cfg.Node.StaticNodes()

var myId uint16
var joinExisting bool

if joinExistingId > 0 {
myId = uint16(joinExistingId)
joinExisting = true
} else {
peerIds := make([]string, len(peers))
for peerIdx, peer := range peers {
peerId := peer.ID.String()
peerIds[peerIdx] = peerId
if peerId == strId {
myId = uint16(peerIdx) + 1
}
}

if myId == 0 {
utils.Fatalf("failed to find local enode ID (%v) amongst peer IDs: %v", strId, peerIds)
}
}

ethereum := <-ethChan

return raft.New(ctx, params.TestChainConfig, myId, joinExisting, blockTimeNanos, ethereum, peers, datadir)
}); err != nil {
utils.Fatalf("Failed to register the Raft service: %v", err)
}
}

// Whisper must be explicitly enabled by specifying at least 1 whisper flag or in dev mode
shhEnabled := enableWhisper(ctx)
Expand Down
6 changes: 6 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ var (
utils.GpoPercentileFlag,
utils.ExtraDataFlag,
configFileFlag,
utils.RaftModeFlag,
utils.RaftBlockTimeFlag,
utils.RaftJoinExistingFlag,
utils.EmitCheckpointsFlag,
}

rpcFlags = []cli.Flag{
Expand Down Expand Up @@ -214,6 +218,8 @@ func geth(ctx *cli.Context) error {
// it unlocks any requested accounts, and starts the RPC/IPC interfaces and the
// miner.
func startNode(ctx *cli.Context, stack *node.Node) {
log.DoEmitCheckpoints = ctx.GlobalBool(utils.EmitCheckpointsFlag.Name)

// Start up the node itself
utils.StartNode(stack)

Expand Down
8 changes: 8 additions & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ var AppHelpFlagGroups = []flagGroup{
utils.TrieCacheGenFlag,
},
},
{
Name: "RAFT",
Flags: []cli.Flag{
utils.RaftModeFlag,
utils.RaftBlockTimeFlag,
utils.RaftJoinExistingFlag,
},
},
{
Name: "ACCOUNT",
Flags: []cli.Flag{
Expand Down
26 changes: 25 additions & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,26 @@ var (
Name: "shh.pow",
Usage: "Minimum POW accepted",
Value: whisper.DefaultMinimumPoW,
}

// Raft flags
RaftModeFlag = cli.BoolFlag{
Name: "raft",
Usage: "If enabled, uses Raft instead of Quorum Chain for consensus",
}
RaftBlockTimeFlag = cli.IntFlag{
Name: "raftblocktime",
Usage: "Amount of time between raft block creations in milliseconds",
Value: 50,
}
RaftJoinExistingFlag = cli.IntFlag{
Name: "raftjoinexisting",
Usage: "The raft ID to assume when joining an pre-existing cluster",
Value: 0,
}
EmitCheckpointsFlag = cli.BoolFlag{
Name: "emitcheckpoints",
Usage: "If enabled, emit specially formatted logging checkpoints",
}
)

Expand Down Expand Up @@ -1002,7 +1022,8 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
}

// RegisterEthService adds an Ethereum client to the stack.
func RegisterEthService(stack *node.Node, cfg *eth.Config) {
func RegisterEthService(stack *node.Node, cfg *eth.Config) <-chan *eth.Ethereum {
nodeChan := make(chan *eth.Ethereum, 1)
var err error
if cfg.SyncMode == downloader.LightSync {
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
Expand All @@ -1015,12 +1036,15 @@ func RegisterEthService(stack *node.Node, cfg *eth.Config) {
ls, _ := les.NewLesServer(fullNode, cfg)
fullNode.AddLesServer(ls)
}
nodeChan <- fullNode
return fullNode, err
})
}
if err != nil {
Fatalf("Failed to register the Ethereum service: %v", err)
}

return nodeChan
}

// RegisterShhService configures Whisper and adds it to the given node.
Expand Down
2 changes: 2 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,8 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
blockInsertTimer.UpdateSince(bstart)
events = append(events, ChainSideEvent{block})
}

log.EmitCheckpoint(log.BlockCreated, fmt.Sprintf("%x", block.Hash()))
stats.processed++
stats.usedGas += usedGas.Uint64()
stats.report(chain, i)
Expand Down
4 changes: 3 additions & 1 deletion core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,9 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
// Transactor should have enough funds to cover the costs
// cost == V + GP * GL
if currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
return ErrInsufficientFunds
if !types.IsQuorum {
return ErrInsufficientFunds
}
}
intrGas := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
if tx.Gas().Cmp(intrGas) < 0 {
Expand Down
1 change: 1 addition & 0 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
var (
ErrInvalidSig = errors.New("invalid transaction v, r, s values")
errNoSigner = errors.New("missing signing methods")
IsQuorum = false
)

// deriveSigner makes a *best* guess about which signer to use.
Expand Down
2 changes: 2 additions & 0 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ type ProtocolManager struct {
// wait group is used for graceful shutdowns during downloading
// and processing
wg sync.WaitGroup

raftMode bool
}

// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
Expand Down
2 changes: 2 additions & 0 deletions internal/ethapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,8 +1084,10 @@ func submitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (c
from, _ := types.Sender(signer, tx)
addr := crypto.CreateAddress(from, tx.Nonce())
log.Info("Submitted contract creation", "fullhash", tx.Hash().Hex(), "contract", addr.Hex())
log.EmitCheckpoint(log.TxCreated, tx.Hash().Hex(), addr.Hex())
} else {
log.Info("Submitted transaction", "fullhash", tx.Hash().Hex(), "recipient", tx.To())
log.EmitCheckpoint(log.TxCreated, tx.Hash().Hex(), tx.To().Hex())
}
return tx.Hash(), nil
}
Expand Down
18 changes: 18 additions & 0 deletions log/emit_checkpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package log

const (
TxCreated = "TX-CREATED"
TxAccepted = "TX-ACCEPTED"
BecameMinter = "BECAME-MINTER"
BecameVerifier = "BECAME-VERIFIER"
BlockCreated = "BLOCK-CREATED"
BlockVotingStarted = "BLOCK-VOTING-STARTED"
)

var DoEmitCheckpoints = false

func EmitCheckpoint(checkpointName string, logValues ...interface{}) {
if DoEmitCheckpoints {
Info("QUORUM-CHECKPOINT", "name", checkpointName, "data", logValues)
}
}
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func (n *Node) Start() error {

// Initialize the p2p server. This creates the node key and
// discovery databases.
n.config.P2P.PrivateKey = n.config.NodeKey()
n.serverConfig = n.config.P2P
n.serverConfig.PrivateKey = n.config.NodeKey()
n.serverConfig.Name = n.config.NodeName()
if n.serverConfig.StaticNodes == nil {
n.serverConfig.StaticNodes = n.config.StaticNodes()
Expand Down
21 changes: 21 additions & 0 deletions raft/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package raft

type PublicRaftAPI struct {
raftService *RaftService
}

func NewPublicRaftAPI(raftService *RaftService) *PublicRaftAPI {
return &PublicRaftAPI{raftService}
}

func (s *PublicRaftAPI) Role() string {
return s.raftService.raftProtocolManager.NodeInfo().Role
}

func (s *PublicRaftAPI) AddPeer(raftId uint16, enodeId string) error {
return s.raftService.raftProtocolManager.ProposeNewPeer(raftId, enodeId)
}

func (s *PublicRaftAPI) RemovePeer(raftId uint16) {
s.raftService.raftProtocolManager.ProposePeerRemoval(raftId)
}
104 changes: 104 additions & 0 deletions raft/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package raft

import (
"sync"
"time"

"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
)

type RaftService struct {
blockchain *core.BlockChain
chainDb ethdb.Database // Block chain database
txMu sync.Mutex
txPool *core.TxPool
accountManager *accounts.Manager

raftProtocolManager *ProtocolManager
startPeers []*discover.Node

// we need an event mux to instantiate the blockchain
eventMux *event.TypeMux
minter *minter
}

type RaftNodeInfo struct {
ClusterSize int `json:"clusterSize"`
Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block
Head common.Hash `json:"head"` // SHA3 hash of the host's best owned block
Role string `json:"role"`
}

func New(ctx *node.ServiceContext, chainConfig *params.ChainConfig, raftId uint16, joinExisting bool, blockTime time.Duration, e *eth.Ethereum, startPeers []*discover.Node, datadir string) (*RaftService, error) {
service := &RaftService{
eventMux: ctx.EventMux,
chainDb: e.ChainDb(),
blockchain: e.BlockChain(),
txPool: e.TxPool(),
accountManager: e.AccountManager(),
startPeers: startPeers,
}

service.minter = newMinter(chainConfig, service, blockTime)

var err error
if service.raftProtocolManager, err = NewProtocolManager(raftId, service.blockchain, service.eventMux, startPeers, joinExisting, datadir, service.minter); err != nil {
return nil, err
}

return service, nil
}

// Backend interface methods:

func (service *RaftService) AccountManager() *accounts.Manager { return service.accountManager }
func (service *RaftService) BlockChain() *core.BlockChain { return service.blockchain }
func (service *RaftService) ChainDb() ethdb.Database { return service.chainDb }
func (service *RaftService) DappDb() ethdb.Database { return nil }
func (service *RaftService) EventMux() *event.TypeMux { return service.eventMux }
func (service *RaftService) TxPool() *core.TxPool { return service.txPool }

// node.Service interface methods:

func (service *RaftService) Protocols() []p2p.Protocol { return []p2p.Protocol{} }
func (service *RaftService) APIs() []rpc.API {
return []rpc.API{
{
Namespace: "raft",
Version: "1.0",
Service: NewPublicRaftAPI(service),
Public: true,
},
}
}

// Start implements node.Service, starting the background data propagation thread
// of the protocol.
func (service *RaftService) Start(p2pServer *p2p.Server) error {
service.raftProtocolManager.Start(p2pServer)
return nil
}

// Stop implements node.Service, stopping the background data propagation thread
// of the protocol.
func (service *RaftService) Stop() error {
service.blockchain.Stop()
service.raftProtocolManager.Stop()
service.eventMux.Stop()

service.chainDb.Close()

log.Info("Raft stopped")
return nil
}
33 changes: 33 additions & 0 deletions raft/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package raft

import (
etcdRaft "github.com/coreos/etcd/raft"
)

const (
protocolName = "raft"
protocolVersion uint64 = 0x01

raftMsg = 0x00

minterRole = etcdRaft.LEADER
verifierRole = etcdRaft.NOT_LEADER

// Raft's ticker interval
tickerMS = 100

// We use a bounded channel of constant size buffering incoming messages
msgChanSize = 1000

// Snapshot after this many raft messages
//
// TODO: measure and get this as low as possible without affecting performance
//
snapshotPeriod = 250

peerUrlKeyPrefix = "peerUrl-"
)

var (
appliedDbKey = []byte("applied")
)
Loading

0 comments on commit 7e94f76

Please sign in to comment.