Skip to content

Commit

Permalink
add support for get_work
Browse files Browse the repository at this point in the history
  • Loading branch information
asdvxgxasjab committed Jul 23, 2019
1 parent 6e3a7b8 commit 03bf11b
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 3 deletions.
13 changes: 10 additions & 3 deletions miner.go
Expand Up @@ -247,9 +247,16 @@ func (m *Miner) Shutdown() {
// Create a new block off of the given tip block.
func (m *Miner) createNextBlock(tipID BlockID, tipHeader *BlockHeader) (*Block, error) {
log.Printf("Miner %d mining new block from current tip %s\n", m.num, tipID)
pubKey := m.pubKeys[m.keyIndex]
return createNextBlock(tipID, tipHeader, m.txQueue, m.blockStore, pubKey, m.memo)
}

// Called by the miner as well as the peer to support get_work.
func createNextBlock(tipID BlockID, tipHeader *BlockHeader, txQueue TransactionQueue,
blockStore BlockStorage, pubKey ed25519.PublicKey, memo string) (*Block, error) {

// fetch transactions to confirm from the queue
txs := m.txQueue.Get(MAX_TRANSACTIONS_TO_INCLUDE_PER_BLOCK - 1)
txs := txQueue.Get(MAX_TRANSACTIONS_TO_INCLUDE_PER_BLOCK - 1)

// calculate total fees
var fees int64 = 0
Expand All @@ -262,13 +269,13 @@ func (m *Miner) createNextBlock(tipID BlockID, tipHeader *BlockHeader) (*Block,
reward := BlockCreationReward(newHeight) + fees

// build coinbase
tx := NewTransaction(nil, m.pubKeys[m.keyIndex], reward, 0, 0, 0, newHeight, m.memo)
tx := NewTransaction(nil, pubKey, reward, 0, 0, 0, newHeight, memo)

// prepend coinbase
txs = append([]*Transaction{tx}, txs...)

// compute the next target
newTarget, err := computeTarget(tipHeader, m.blockStore)
newTarget, err := computeTarget(tipHeader, blockStore)
if err != nil {
return nil, err
}
Expand Down
170 changes: 170 additions & 0 deletions peer.go
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"fmt"
"log"
"math/rand"
"net/http"
"net/url"
"sync"
Expand Down Expand Up @@ -38,6 +39,10 @@ type Peer struct {
filterLock sync.RWMutex
filter *cuckoo.Filter
addrChan chan<- string
workBlock *Block
medianTimestamp int64
pubKeys []ed25519.PublicKey
memo string
readLimitLock sync.RWMutex
readLimit int64
closeHandler func()
Expand Down Expand Up @@ -206,6 +211,10 @@ func (p *Peer) run() {
onConnectChan <- true
}()

// written to by the reader loop to update the current work block for the peer
getWorkChan := make(chan GetWorkMessage, 1)
submitWorkChan := make(chan SubmitWorkMessage, 1)

// writer goroutine loop
p.wg.Add(1)
go func() {
Expand Down Expand Up @@ -257,6 +266,9 @@ func (p *Peer) run() {
// update read limit if necessary
p.updateReadLimit()

// create and send out new work if necessary
p.createNewWorkBlock(tip.BlockID, tip.Block.Header)

if tip.Source == p.conn.RemoteAddr().String() {
// this is who sent us the block that caused the change
break
Expand Down Expand Up @@ -306,6 +318,9 @@ func (p *Peer) run() {
}

case newTx := <-newTxChan:
// update work if necessary
p.updateWorkBlock(newTx.TransactionID, newTx.Transaction)

if newTx.Source == p.conn.RemoteAddr().String() {
// this is who sent it to us
break
Expand Down Expand Up @@ -349,6 +364,12 @@ func (p *Peer) run() {
p.conn.Close()
}

case gw := <-getWorkChan:
p.onGetWork(gw)

case sw := <-submitWorkChan:
p.onSubmitWork(sw)

case <-tickerPing.C:
//log.Printf("Sending ping message to: %s\n", p.conn.RemoteAddr())
p.conn.SetWriteDeadline(time.Now().Add(writeWait))
Expand Down Expand Up @@ -659,6 +680,24 @@ func (p *Peer) run() {
},
}

case "get_work":
var gw GetWorkMessage
if err := json.Unmarshal(body, &gw); err != nil {
log.Printf("Error: %s, from: %s\n", err, p.conn.RemoteAddr())
return
}
log.Printf("Received get_work message, from: %s\n", p.conn.RemoteAddr())
getWorkChan <- gw

case "submit_work":
var sw SubmitWorkMessage
if err := json.Unmarshal(body, &sw); err != nil {
log.Printf("Error: %s, from: %s\n", err, p.conn.RemoteAddr())
return
}
log.Printf("Received submit_work message, from: %s\n", p.conn.RemoteAddr())
submitWorkChan <- sw

default:
log.Printf("Unknown message: %s, from: %s\n", m.Type, p.conn.RemoteAddr())
}
Expand Down Expand Up @@ -1445,6 +1484,137 @@ func (p *Peer) onPeerAddresses(addresses []string) {
}
}

// Called from the writer goroutine loop
func (p *Peer) onGetWork(gw GetWorkMessage) {
var err error
if p.workBlock != nil {
err = fmt.Errorf("Peer already has work")
} else if len(gw.PublicKeys) == 0 {
err = fmt.Errorf("No public keys specified")
} else if len(gw.Memo) > MAX_MEMO_LENGTH {
err = fmt.Errorf("Max memo length (%d) exceeded: %d", MAX_MEMO_LENGTH, len(gw.Memo))
} else {
var tipID *BlockID
var tipHeader *BlockHeader
tipID, tipHeader, _, err = getChainTipHeader(p.ledger, p.blockStore)
if err != nil {
log.Printf("Error getting tip header: %s, for: %s\n", err, p.conn.RemoteAddr())
} else {
// create and send out new work
p.pubKeys = gw.PublicKeys
p.memo = gw.Memo
p.createNewWorkBlock(*tipID, tipHeader)
}
}

if err != nil {
m := Message{Type: "work", Body: WorkMessage{Error: err.Error()}}
p.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := p.conn.WriteJSON(m); err != nil {
log.Printf("Write error: %s, to: %s\n", err, p.conn.RemoteAddr())
p.conn.Close()
}
}
}

// Create a new work block for a mining peer. Called from the writer goroutine loop.
func (p *Peer) createNewWorkBlock(tipID BlockID, tipHeader *BlockHeader) error {
if len(p.pubKeys) == 0 {
// peer doesn't want work
return nil
}

medianTimestamp, err := computeMedianTimestamp(tipHeader, p.blockStore)
if err != nil {
log.Printf("Error computing median timestamp: %s, for: %s\n", err, p.conn.RemoteAddr())
} else {
// create a new block
p.medianTimestamp = medianTimestamp
keyIndex := rand.Intn(len(p.pubKeys))
p.workBlock, err = createNextBlock(tipID, tipHeader, p.txQueue, p.blockStore, p.pubKeys[keyIndex], p.memo)
if err != nil {
log.Printf("Error creating next block: %s, for: %s\n", err, p.conn.RemoteAddr())
}
}

m := Message{Type: "work"}
if err != nil {
m.Body = WorkMessage{Error: err.Error()}
} else {
m.Body = WorkMessage{Header: p.workBlock.Header, MinTime: p.medianTimestamp + 1}
}

p.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := p.conn.WriteJSON(m); err != nil {
log.Printf("Write error: %s, to: %s\n", err, p.conn.RemoteAddr())
p.conn.Close()
return err
}
return err
}

// Add the transaction to the current work block for the mining peer. Called from the writer goroutine loop.
func (p *Peer) updateWorkBlock(id TransactionID, tx *Transaction) error {
if p.workBlock == nil {
// peer doesn't want work
return nil
}

if MAX_TRANSACTIONS_TO_INCLUDE_PER_BLOCK != 0 &&
len(p.workBlock.Transactions) >= MAX_TRANSACTIONS_TO_INCLUDE_PER_BLOCK {
log.Printf("Per-block transaction limit hit (%d)\n", len(p.workBlock.Transactions))
return nil
}

m := Message{Type: "work"}

// add the transaction to the block (it updates the coinbase fee)
err := p.workBlock.AddTransaction(id, tx)
if err != nil {
log.Printf("Error adding new transaction %s to block: %s\n", id, err)
// abandon the block
p.workBlock = nil
m.Body = WorkMessage{Error: err.Error()}
} else {
// send out the new work
m.Body = WorkMessage{Header: p.workBlock.Header, MinTime: p.medianTimestamp + 1}
}

p.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := p.conn.WriteJSON(m); err != nil {
log.Printf("Write error: %s, to: %s\n", err, p.conn.RemoteAddr())
p.conn.Close()
return err
}

return err
}

// Handle a submission of mining work. Called from the writer goroutine loop.
func (p *Peer) onSubmitWork(sw SubmitWorkMessage) {
m := Message{Type: "submit_work_result"}
id, err := sw.Header.ID()

if err != nil {
log.Printf("Error computing block ID: %s, from: %s\n", err, p.conn.RemoteAddr())
} else {
p.workBlock.Header = sw.Header
err = p.processor.ProcessBlock(id, p.workBlock, p.conn.RemoteAddr().String())
log.Printf("Error processing work block: %s, for: %s\n", err, p.conn.RemoteAddr())
p.workBlock = nil
}

if err != nil {
m.Body = SubmitWorkResultMessage{Error: err.Error()}
}

p.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := p.conn.WriteJSON(m); err != nil {
log.Printf("Write error: %s, to: %s\n", err, p.conn.RemoteAddr())
p.conn.Close()
}
}

// Update the read limit if necessary
func (p *Peer) updateReadLimit() {
ok, height, err := IsInitialBlockDownload(p.ledger, p.blockStore)
Expand Down
31 changes: 31 additions & 0 deletions protocol.go
Expand Up @@ -209,3 +209,34 @@ type TransactionRelayPolicyMessage struct {
MinFee int64 `json:"min_fee"`
MinAmount int64 `json:"min_amount"`
}

// GetWorkMessage is used by a mining peer to request mining work.
// Type: "get_work"
type GetWorkMessage struct {
PublicKeys []ed25519.PublicKey `json:"public_keys"`
Memo string `json:"memo,omitempty"`
}

// WorkMessage is used by a client to send work to perform to a mining peer.
// The timestamp and nonce in the header can be manipulated by the mining peer.
// It is the mining peer's responsibility to ensure the timestamp is not set below
// the minimum timestamp and that the nonce does not exceed MAX_NUMBER (2^53-1).
// Type: "work"
type WorkMessage struct {
Header *BlockHeader `json:"header"`
MinTime int64 `json:"min_time"`
Error string `json:"error,omitempty"`
}

// SubmitWorkMessage is used by a mining peer to submit a potential solution to the client.
// Type: "submit_work"
type SubmitWorkMessage struct {
BlockID BlockID `json:"block_id"`
Header *BlockHeader `json:"header"`
}

// SubmitWorkResultMessage is used to inform a mining peer of the result of its work.
// Type: "submit_work_result"
type SubmitWorkResultMessage struct {
Error string `json:"error,omitempty"`
}

0 comments on commit 03bf11b

Please sign in to comment.