Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd, consensus/ethash, eth: miner push notifications #17347

Merged
merged 1 commit into from Aug 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/geth/main.go
Expand Up @@ -98,8 +98,9 @@ var (
utils.MaxPendingPeersFlag,
utils.EtherbaseFlag,
utils.GasPriceFlag,
utils.MinerThreadsFlag,
utils.MiningEnabledFlag,
utils.MinerThreadsFlag,
utils.MinerNotifyFlag,
utils.TargetGasLimitFlag,
utils.NATFlag,
utils.NoDiscoverFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/usage.go
Expand Up @@ -185,6 +185,7 @@ var AppHelpFlagGroups = []flagGroup{
Flags: []cli.Flag{
utils.MiningEnabledFlag,
utils.MinerThreadsFlag,
utils.MinerNotifyFlag,
utils.EtherbaseFlag,
utils.TargetGasLimitFlag,
utils.GasPriceFlag,
Expand Down
14 changes: 10 additions & 4 deletions cmd/utils/flags.go
Expand Up @@ -24,7 +24,6 @@ import (
"math/big"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -318,9 +317,13 @@ var (
Usage: "Enable mining",
}
MinerThreadsFlag = cli.IntFlag{
Name: "minerthreads",
Name: "miner.threads",
Usage: "Number of CPU threads to use for mining",
Value: runtime.NumCPU(),
Value: 0,
}
MinerNotifyFlag = cli.StringFlag{
Name: "miner.notify",
Usage: "Comma separated HTTP URL list to notify of new work packages",
}
TargetGasLimitFlag = cli.Uint64Flag{
Name: "targetgaslimit",
Expand Down Expand Up @@ -1093,6 +1096,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
if ctx.GlobalIsSet(MinerThreadsFlag.Name) {
cfg.MinerThreads = ctx.GlobalInt(MinerThreadsFlag.Name)
}
if ctx.GlobalIsSet(MinerNotifyFlag.Name) {
cfg.MinerNotify = strings.Split(ctx.GlobalString(MinerNotifyFlag.Name), ",")
}
if ctx.GlobalIsSet(DocRootFlag.Name) {
cfg.DocRoot = ctx.GlobalString(DocRootFlag.Name)
}
Expand Down Expand Up @@ -1293,7 +1299,7 @@ func MakeChain(ctx *cli.Context, stack *node.Node) (chain *core.BlockChain, chai
DatasetDir: stack.ResolvePath(eth.DefaultConfig.Ethash.DatasetDir),
DatasetsInMem: eth.DefaultConfig.Ethash.DatasetsInMem,
DatasetsOnDisk: eth.DefaultConfig.Ethash.DatasetsOnDisk,
})
}, nil)
}
}
if gcmode := ctx.GlobalString(GCModeFlag.Name); gcmode != "full" && gcmode != "archive" {
Expand Down
2 changes: 1 addition & 1 deletion consensus/ethash/algorithm_test.go
Expand Up @@ -729,7 +729,7 @@ func TestConcurrentDiskCacheGeneration(t *testing.T) {

go func(idx int) {
defer pend.Done()
ethash := New(Config{cachedir, 0, 1, "", 0, 0, ModeNormal})
ethash := New(Config{cachedir, 0, 1, "", 0, 0, ModeNormal}, nil)
defer ethash.Close()
if err := ethash.VerifySeal(nil, block.Header()); err != nil {
t.Errorf("proc %d: block verification failed: %v", idx, err)
Expand Down
2 changes: 1 addition & 1 deletion consensus/ethash/consensus.go
Expand Up @@ -493,7 +493,7 @@ func (ethash *Ethash) VerifySeal(chain consensus.ChainReader, header *types.Head
if !bytes.Equal(header.MixDigest[:], digest) {
return errInvalidMixDigest
}
target := new(big.Int).Div(maxUint256, header.Difficulty)
target := new(big.Int).Div(two256, header.Difficulty)
if new(big.Int).SetBytes(result).Cmp(target) > 0 {
return errInvalidPoW
}
Expand Down
18 changes: 10 additions & 8 deletions consensus/ethash/ethash.go
Expand Up @@ -45,11 +45,11 @@ import (
var ErrInvalidDumpMagic = errors.New("invalid dump magic")

var (
// maxUint256 is a big integer representing 2^256-1
maxUint256 = new(big.Int).Exp(big.NewInt(2), big.NewInt(256), big.NewInt(0))
// two256 is a big integer representing 2^256
two256 = new(big.Int).Exp(big.NewInt(2), big.NewInt(256), big.NewInt(0))

// sharedEthash is a full instance that can be shared between multiple users.
sharedEthash = New(Config{"", 3, 0, "", 1, 0, ModeNormal})
sharedEthash = New(Config{"", 3, 0, "", 1, 0, ModeNormal}, nil)

// algorithmRevision is the data structure version used for file naming.
algorithmRevision = 23
Expand Down Expand Up @@ -447,8 +447,10 @@ type Ethash struct {
exitCh chan chan error // Notification channel to exiting backend threads
}

// New creates a full sized ethash PoW scheme and starts a background thread for remote mining.
func New(config Config) *Ethash {
// New creates a full sized ethash PoW scheme and starts a background thread for
// remote mining, also optionally notifying a batch of remote services of new work
// packages.
func New(config Config, notify []string) *Ethash {
if config.CachesInMem <= 0 {
log.Warn("One ethash cache must always be in memory", "requested", config.CachesInMem)
config.CachesInMem = 1
Expand All @@ -473,13 +475,13 @@ func New(config Config) *Ethash {
submitRateCh: make(chan *hashrate),
exitCh: make(chan chan error),
}
go ethash.remote()
go ethash.remote(notify)
return ethash
}

// NewTester creates a small sized ethash PoW scheme useful only for testing
// purposes.
func NewTester() *Ethash {
func NewTester(notify []string) *Ethash {
ethash := &Ethash{
config: Config{PowMode: ModeTest},
caches: newlru("cache", 1, newCache),
Expand All @@ -494,7 +496,7 @@ func NewTester() *Ethash {
submitRateCh: make(chan *hashrate),
exitCh: make(chan chan error),
}
go ethash.remote()
go ethash.remote(notify)
return ethash
}

Expand Down
43 changes: 20 additions & 23 deletions consensus/ethash/ethash_test.go
Expand Up @@ -32,17 +32,18 @@ import (

// Tests that ethash works correctly in test mode.
func TestTestMode(t *testing.T) {
head := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)}
header := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)}

ethash := NewTester()
ethash := NewTester(nil)
defer ethash.Close()
block, err := ethash.Seal(nil, types.NewBlockWithHeader(head), nil)

block, err := ethash.Seal(nil, types.NewBlockWithHeader(header), nil)
if err != nil {
t.Fatalf("failed to seal block: %v", err)
}
head.Nonce = types.EncodeNonce(block.Nonce())
head.MixDigest = block.MixDigest()
if err := ethash.VerifySeal(nil, head); err != nil {
header.Nonce = types.EncodeNonce(block.Nonce())
header.MixDigest = block.MixDigest()
if err := ethash.VerifySeal(nil, header); err != nil {
t.Fatalf("unexpected verification error: %v", err)
}
}
Expand All @@ -55,7 +56,7 @@ func TestCacheFileEvict(t *testing.T) {
t.Fatal(err)
}
defer os.RemoveAll(tmpdir)
e := New(Config{CachesInMem: 3, CachesOnDisk: 10, CacheDir: tmpdir, PowMode: ModeTest})
e := New(Config{CachesInMem: 3, CachesOnDisk: 10, CacheDir: tmpdir, PowMode: ModeTest}, nil)
defer e.Close()

workers := 8
Expand All @@ -78,21 +79,21 @@ func verifyTest(wg *sync.WaitGroup, e *Ethash, workerIndex, epochs int) {
if block < 0 {
block = 0
}
head := &types.Header{Number: big.NewInt(block), Difficulty: big.NewInt(100)}
e.VerifySeal(nil, head)
header := &types.Header{Number: big.NewInt(block), Difficulty: big.NewInt(100)}
e.VerifySeal(nil, header)
}
}

func TestRemoteSealer(t *testing.T) {
ethash := NewTester()
ethash := NewTester(nil)
defer ethash.Close()

api := &API{ethash}
if _, err := api.GetWork(); err != errNoMiningWork {
t.Error("expect to return an error indicate there is no mining work")
}

head := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)}
block := types.NewBlockWithHeader(head)
header := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)}
block := types.NewBlockWithHeader(header)

// Push new work.
ethash.Seal(nil, block, nil)
Expand All @@ -108,16 +109,14 @@ func TestRemoteSealer(t *testing.T) {
if res := api.SubmitWork(types.BlockNonce{}, block.HashNoNonce(), common.Hash{}); res {
t.Error("expect to return false when submit a fake solution")
}

// Push new block with same block number to replace the original one.
head = &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(1000)}
block = types.NewBlockWithHeader(head)
header = &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(1000)}
block = types.NewBlockWithHeader(header)
ethash.Seal(nil, block, nil)

if work, err = api.GetWork(); err != nil || work[0] != block.HashNoNonce().Hex() {
t.Error("expect to return the latest pushed work")
}

// Push block with higher block number.
newHead := &types.Header{Number: big.NewInt(2), Difficulty: big.NewInt(100)}
newBlock := types.NewBlockWithHeader(newHead)
Expand All @@ -130,19 +129,18 @@ func TestRemoteSealer(t *testing.T) {

func TestHashRate(t *testing.T) {
var (
ethash = NewTester()
api = &API{ethash}
hashrate = []hexutil.Uint64{100, 200, 300}
expect uint64
ids = []common.Hash{common.HexToHash("a"), common.HexToHash("b"), common.HexToHash("c")}
)

ethash := NewTester(nil)
defer ethash.Close()

if tot := ethash.Hashrate(); tot != 0 {
t.Error("expect the result should be zero")
}

api := &API{ethash}
for i := 0; i < len(hashrate); i += 1 {
if res := api.SubmitHashRate(hashrate[i], ids[i]); !res {
t.Error("remote miner submit hashrate failed")
Expand All @@ -155,9 +153,8 @@ func TestHashRate(t *testing.T) {
}

func TestClosedRemoteSealer(t *testing.T) {
ethash := NewTester()
// Make sure exit channel has been listened
time.Sleep(1 * time.Second)
ethash := NewTester(nil)
time.Sleep(1 * time.Second) // ensure exit channel is listening
ethash.Close()

api := &API{ethash}
Expand Down
88 changes: 59 additions & 29 deletions consensus/ethash/sealer.go
Expand Up @@ -17,11 +17,14 @@
package ethash

import (
"bytes"
crand "crypto/rand"
"encoding/json"
"errors"
"math"
"math/big"
"math/rand"
"net/http"
"runtime"
"sync"
"time"
Expand Down Expand Up @@ -109,7 +112,7 @@ func (ethash *Ethash) mine(block *types.Block, id int, seed uint64, abort chan s
var (
header = block.Header()
hash = header.HashNoNonce().Bytes()
target = new(big.Int).Div(maxUint256, header.Difficulty)
target = new(big.Int).Div(two256, header.Difficulty)
number = header.Number.Uint64()
dataset = ethash.dataset(number)
)
Expand Down Expand Up @@ -161,40 +164,65 @@ search:
runtime.KeepAlive(dataset)
}

// remote starts a standalone goroutine to handle remote mining related stuff.
func (ethash *Ethash) remote() {
// remote is a standalone goroutine to handle remote mining related stuff.
func (ethash *Ethash) remote(notify []string) {
var (
works = make(map[common.Hash]*types.Block)
rates = make(map[common.Hash]hashrate)
currentWork *types.Block
works = make(map[common.Hash]*types.Block)
rates = make(map[common.Hash]hashrate)

currentBlock *types.Block
currentWork [3]string

notifyTransport = &http.Transport{}
notifyClient = &http.Client{
Transport: notifyTransport,
Timeout: time.Second,
}
notifyReqs = make([]*http.Request, len(notify))
)
// notifyWork notifies all the specified mining endpoints of the availability of
// new work to be processed.
notifyWork := func() {
work := currentWork
blob, _ := json.Marshal(work)

// getWork returns a work package for external miner.
for i, url := range notify {
// Terminate any previously pending request and create the new work
if notifyReqs[i] != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set the notifyReq[i] as nil if the request has been sent successfully?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be racey. To be fair however, this current code might be racey too. I'll think about it.

notifyTransport.CancelRequest(notifyReqs[i])
}
notifyReqs[i], _ = http.NewRequest("POST", url, bytes.NewReader(blob))
notifyReqs[i].Header.Set("Content-Type", "application/json")

// Push the new work concurrently to all the remote nodes
go func(req *http.Request, url string) {
res, err := notifyClient.Do(req)
if err != nil {
log.Warn("Failed to notify remote miner", "err", err)
} else {
log.Trace("Notified remote miner", "miner", url, "hash", log.Lazy{Fn: func() common.Hash { return common.HexToHash(work[0]) }}, "target", work[2])
res.Body.Close()
}
}(notifyReqs[i], url)
}
}
// makeWork creates a work package for external miner.
//
// The work package consists of 3 strings:
// result[0], 32 bytes hex encoded current block header pow-hash
// result[1], 32 bytes hex encoded seed hash used for DAG
// result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty
getWork := func() ([3]string, error) {
var res [3]string
if currentWork == nil {
return res, errNoMiningWork
}
res[0] = currentWork.HashNoNonce().Hex()
res[1] = common.BytesToHash(SeedHash(currentWork.NumberU64())).Hex()
makeWork := func(block *types.Block) {
hash := block.HashNoNonce()

// Calculate the "target" to be returned to the external sealer.
n := big.NewInt(1)
n.Lsh(n, 255)
n.Div(n, currentWork.Difficulty())
n.Lsh(n, 1)
res[2] = common.BytesToHash(n.Bytes()).Hex()
currentWork[0] = hash.Hex()
currentWork[1] = common.BytesToHash(SeedHash(block.NumberU64())).Hex()
currentWork[2] = common.BytesToHash(new(big.Int).Div(two256, block.Difficulty()).Bytes()).Hex()

// Trace the seal work fetched by remote sealer.
works[currentWork.HashNoNonce()] = currentWork
return res, nil
currentBlock = block
works[hash] = block
}

// submitWork verifies the submitted pow solution, returning
// whether the solution was accepted or not (not can be both a bad pow as well as
// any other error, like no pending work or stale mining result).
Expand Down Expand Up @@ -238,21 +266,23 @@ func (ethash *Ethash) remote() {
for {
select {
case block := <-ethash.workCh:
if currentWork != nil && block.ParentHash() != currentWork.ParentHash() {
if currentBlock != nil && block.ParentHash() != currentBlock.ParentHash() {
// Start new round mining, throw out all previous work.
works = make(map[common.Hash]*types.Block)
}
// Update current work with new received block.
// Note same work can be past twice, happens when changing CPU threads.
currentWork = block
makeWork(block)

// Notify and requested URLs of the new work availability
notifyWork()

case work := <-ethash.fetchWorkCh:
// Return current mining work to remote miner.
miningWork, err := getWork()
if err != nil {
work.errc <- err
if currentBlock == nil {
work.errc <- errNoMiningWork
} else {
work.res <- miningWork
work.res <- currentWork
}

case result := <-ethash.submitWorkCh:
Expand Down