Navigation Menu

Skip to content

Commit

Permalink
main, utils, eth, downloader, les: Add an option to stop geth once in…
Browse files Browse the repository at this point in the history
  • Loading branch information
danrpts committed Jun 18, 2018
1 parent c95e4a8 commit fcef03b
Show file tree
Hide file tree
Showing 15 changed files with 43 additions and 16 deletions.
2 changes: 1 addition & 1 deletion cmd/geth/chaincmd.go
Expand Up @@ -373,7 +373,7 @@ func copyDb(ctx *cli.Context) error {
chain, chainDb := utils.MakeChain(ctx, stack)

syncmode := *utils.GlobalTextMarshaler(ctx, utils.SyncModeFlag.Name).(*downloader.SyncMode)
dl := downloader.New(syncmode, chainDb, new(event.TypeMux), chain, nil, nil)
dl := downloader.New(syncmode, chainDb, new(event.TypeMux), chain, nil, nil, ctx.GlobalDuration(utils.ExitInSyncFlag.Name))

// Create a source peer to satisfy downloader requests from
db, err := ethdb.NewLDBDatabase(ctx.Args().First(), ctx.GlobalInt(utils.CacheFlag.Name), 256)
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/main.go
Expand Up @@ -72,6 +72,7 @@ var (
utils.EthashDatasetDirFlag,
utils.EthashDatasetsInMemoryFlag,
utils.EthashDatasetsOnDiskFlag,
utils.ExitInSyncFlag,
utils.TxPoolNoLocalsFlag,
utils.TxPoolJournalFlag,
utils.TxPoolRejournalFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/usage.go
Expand Up @@ -75,6 +75,7 @@ var AppHelpFlagGroups = []flagGroup{
utils.TestnetFlag,
utils.RinkebyFlag,
utils.SyncModeFlag,
utils.ExitInSyncFlag,
utils.GCModeFlag,
utils.EthStatsURLFlag,
utils.IdentityFlag,
Expand Down
6 changes: 6 additions & 0 deletions cmd/utils/flags.go
Expand Up @@ -170,6 +170,11 @@ var (
Usage: `Blockchain sync mode ("fast", "full", or "light")`,
Value: &defaultSyncMode,
}
ExitInSyncFlag = cli.DurationFlag{
Name: "exitinsync",
Usage: "Exit after block synchronisation",
Value: downloader.NoExit,
}
GCModeFlag = cli.StringFlag{
Name: "gcmode",
Usage: `Blockchain garbage collection mode ("full", "archive")`,
Expand Down Expand Up @@ -1040,6 +1045,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
if ctx.GlobalIsSet(NetworkIdFlag.Name) {
cfg.NetworkId = ctx.GlobalUint64(NetworkIdFlag.Name)
}
cfg.ExitInSync = ctx.GlobalDuration(ExitInSyncFlag.Name)

if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheDatabaseFlag.Name) {
cfg.DatabaseCache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheDatabaseFlag.Name) / 100
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Expand Up @@ -163,7 +163,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
}
eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)

if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb, config.ExitInSync); err != nil {
return nil, err
}
eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)
Expand Down
7 changes: 4 additions & 3 deletions eth/config.go
Expand Up @@ -79,9 +79,10 @@ type Config struct {
Genesis *core.Genesis `toml:",omitempty"`

// Protocol options
NetworkId uint64 // Network ID to use for selecting peers to connect to
SyncMode downloader.SyncMode
NoPruning bool
NetworkId uint64 // Network ID to use for selecting peers to connect to
SyncMode downloader.SyncMode
NoPruning bool
ExitInSync time.Duration

// Light client options
LightServ int `toml:",omitempty"` // Maximum percentage of time allowed for serving LES requests
Expand Down
18 changes: 17 additions & 1 deletion eth/downloader/downloader.go
Expand Up @@ -23,6 +23,7 @@ import (
"math/big"
"sync"
"sync/atomic"
"syscall"
"time"

ethereum "github.com/ethereum/go-ethereum"
Expand All @@ -36,6 +37,10 @@ import (
"github.com/ethereum/go-ethereum/params"
)

const (
NoExit time.Duration = -1 * time.Second
)

var (
MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request
MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
Expand Down Expand Up @@ -115,6 +120,9 @@ type Downloader struct {
// Callbacks
dropPeer peerDropFn // Drops a peer for misbehaving

initialSync int32
exitInSync time.Duration

// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
synchronising int32
Expand Down Expand Up @@ -198,7 +206,7 @@ type BlockChain interface {
}

// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, exitInSync time.Duration) *Downloader {
if lightchain == nil {
lightchain = chain
}
Expand All @@ -214,6 +222,7 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockC
blockchain: chain,
lightchain: lightchain,
dropPeer: dropPeer,
exitInSync: exitInSync,
headerCh: make(chan dataPack, 1),
bodyCh: make(chan dataPack, 1),
receiptCh: make(chan dataPack, 1),
Expand Down Expand Up @@ -317,6 +326,13 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
err := d.synchronise(id, head, td, mode)
switch err {
case nil:
if atomic.CompareAndSwapInt32(&d.initialSync, 0, 1) && d.exitInSync > NoExit {
go func(exitInSync time.Duration) {
log.Debug("Synchronisation completed, exitting", "countdown", exitInSync)
time.Sleep(exitInSync)
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}(d.exitInSync)
}
case errBusy:

case errTimeout, errBadPeer, errStallingPeer,
Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/downloader_test.go
Expand Up @@ -96,7 +96,7 @@ func newTester() *downloadTester {
tester.stateDb = ethdb.NewMemDatabase()
tester.stateDb.Put(genesis.Root().Bytes(), []byte{0x00})

tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer)
tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer, NoExit)

return tester
}
Expand Down
5 changes: 3 additions & 2 deletions eth/handler.go
Expand Up @@ -73,6 +73,7 @@ type ProtocolManager struct {
blockchain *core.BlockChain
chainconfig *params.ChainConfig
maxPeers int
exitInSync time.Duration

downloader *downloader.Downloader
fetcher *fetcher.Fetcher
Expand All @@ -98,7 +99,7 @@ type ProtocolManager struct {

// NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the Ethereum network.
func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, exitInSync time.Duration) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
networkID: networkID,
Expand Down Expand Up @@ -159,7 +160,7 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
return nil, errIncompatibleConfig
}
// Construct the different synchronisation mechanisms
manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer, exitInSync)

validator := func(header *types.Header) error {
return engine.VerifyHeader(blockchain, header, true)
Expand Down
2 changes: 1 addition & 1 deletion eth/handler_test.go
Expand Up @@ -474,7 +474,7 @@ func testDAOChallenge(t *testing.T, localForked, remoteForked bool, timeout bool
genesis = gspec.MustCommit(db)
blockchain, _ = core.NewBlockChain(db, nil, config, pow, vm.Config{})
)
pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db)
pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db, downloader.NoExit)
if err != nil {
t.Fatalf("failed to start test protocol manager: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion eth/helper_test.go
Expand Up @@ -66,7 +66,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func
panic(err)
}

pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db)
pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, downloader.NoExit)
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion les/backend.go
Expand Up @@ -127,7 +127,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
}

leth.txPool = light.NewTxPool(leth.chainConfig, leth.blockchain, leth.relay)
if leth.protocolManager, err = NewProtocolManager(leth.chainConfig, true, ClientProtocolVersions, config.NetworkId, leth.eventMux, leth.engine, leth.peers, leth.blockchain, nil, chainDb, leth.odr, leth.relay, leth.serverPool, quitSync, &leth.wg); err != nil {
if leth.protocolManager, err = NewProtocolManager(leth.chainConfig, true, ClientProtocolVersions, config.NetworkId, leth.eventMux, leth.engine, leth.peers, leth.blockchain, nil, chainDb, leth.odr, leth.relay, leth.serverPool, quitSync, &leth.wg, config.ExitInSync); err != nil {
return nil, err
}
leth.ApiBackend = &LesApiBackend{leth, nil}
Expand Down
4 changes: 2 additions & 2 deletions les/handler.go
Expand Up @@ -129,7 +129,7 @@ type ProtocolManager struct {

// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, protocolVersions []uint, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, serverPool *serverPool, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) {
func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, protocolVersions []uint, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, serverPool *serverPool, quitSync chan struct{}, wg *sync.WaitGroup, exitInSync time.Duration) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
lightSync: lightSync,
Expand Down Expand Up @@ -207,7 +207,7 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, protoco
}

if lightSync {
manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, nil, blockchain, removePeer)
manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, nil, blockchain, removePeer, exitInSync)
manager.peers.notify((*downloaderPeerNotify)(manager))
manager.fetcher = newLightFetcher(manager)
}
Expand Down
3 changes: 2 additions & 1 deletion les/helper_test.go
Expand Up @@ -24,6 +24,7 @@ import (
"math/big"
"sync"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
Expand Down Expand Up @@ -178,7 +179,7 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor
} else {
protocolVersions = ServerProtocolVersions
}
pm, err := NewProtocolManager(gspec.Config, lightSync, protocolVersions, NetworkId, evmux, engine, peers, chain, nil, db, odr, nil, nil, make(chan struct{}), new(sync.WaitGroup))
pm, err := NewProtocolManager(gspec.Config, lightSync, protocolVersions, NetworkId, evmux, engine, peers, chain, nil, db, odr, nil, nil, make(chan struct{}), new(sync.WaitGroup), -1 * time.Second)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion les/server.go
Expand Up @@ -52,7 +52,7 @@ type LesServer struct {

func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) {
quitSync := make(chan struct{})
pm, err := NewProtocolManager(eth.BlockChain().Config(), false, ServerProtocolVersions, config.NetworkId, eth.EventMux(), eth.Engine(), newPeerSet(), eth.BlockChain(), eth.TxPool(), eth.ChainDb(), nil, nil, nil, quitSync, new(sync.WaitGroup))
pm, err := NewProtocolManager(eth.BlockChain().Config(), false, ServerProtocolVersions, config.NetworkId, eth.EventMux(), eth.Engine(), newPeerSet(), eth.BlockChain(), eth.TxPool(), eth.ChainDb(), nil, nil, nil, quitSync, new(sync.WaitGroup), config.ExitInSync)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit fcef03b

Please sign in to comment.