This repository has been archived by the owner on May 13, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 346
/
process.go
126 lines (110 loc) · 3.12 KB
/
process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package abci
import (
"context"
"crypto/sha256"
"fmt"
"sync"
"time"
"github.com/hyperledger/burrow/bcm"
"github.com/hyperledger/burrow/execution"
"github.com/hyperledger/burrow/txs"
"github.com/tendermint/tendermint/abci/types"
tmTypes "github.com/tendermint/tendermint/types"
)
type Process struct {
ticker *time.Ticker
committer execution.BatchCommitter
blockchain *bcm.Blockchain
done chan struct{}
panic func(error)
commitNeeded bool
txDecoder txs.Decoder
shutdownOnce sync.Once
}
// NewProcess returns a no-consensus ABCI process suitable for running a single node without Tendermint.
// The CheckTx function can be used to submit transactions which are processed according
func NewProcess(committer execution.BatchCommitter, blockchain *bcm.Blockchain, txDecoder txs.Decoder,
commitInterval time.Duration, panicFunc func(error)) *Process {
p := &Process{
committer: committer,
blockchain: blockchain,
done: make(chan struct{}),
txDecoder: txDecoder,
panic: panicFunc,
}
if commitInterval != 0 {
p.ticker = time.NewTicker(commitInterval)
go p.triggerCommits()
}
return p
}
func (p *Process) CheckTx(tx tmTypes.Tx, cb func(*types.Response)) error {
const header = "DeliverTx"
p.committer.Lock()
defer p.committer.Unlock()
// Skip check - deliver immediately
// FIXME: [Silas] this means that any transaction that a transaction that fails CheckTx
// that would not normally end up stored in state (as an exceptional tx) will get stored in state.
// This means that the same sequence of transactions fed to no consensus mode can give rise to a state with additional
// invalid transactions in state. Since the state hash is non-deterministic based on when the commits happen it's not
// clear this is a problem. The underlying state will be compatible.
checkTx := ExecuteTx(header, p.committer, p.txDecoder, tx)
cb(types.ToResponseCheckTx(checkTx))
p.commitNeeded = true
if p.ticker == nil {
err := p.commit()
if err != nil {
return err
}
}
return nil
}
func (p *Process) Shutdown(ctx context.Context) (err error) {
p.committer.Lock()
defer p.committer.Unlock()
p.shutdownOnce.Do(func() {
if p.ticker != nil {
p.ticker.Stop()
}
close(p.done)
})
return
}
func (p *Process) triggerCommits() {
for {
select {
case <-p.ticker.C:
p.commitOrPanic()
case <-p.done:
// Escape loop since ticket channel is never closed
}
}
}
func (p *Process) commitOrPanic() {
p.committer.Lock()
defer p.committer.Unlock()
err := p.commit()
if err != nil {
p.panic(err)
}
}
func (p *Process) commit() error {
const errHeader = "commit():"
if !p.commitNeeded {
return nil
}
appHash, err := p.committer.Commit(nil)
if err != nil {
return fmt.Errorf("%s could not Commit tx %v", errHeader, err)
}
// Maintain a basic hashed linked list, mixing in the appHash as we go
hasher := sha256.New()
hasher.Write(appHash)
hasher.Write(p.blockchain.LastBlockHash())
err = p.blockchain.CommitBlock(time.Now(), hasher.Sum(nil), appHash)
if err != nil {
return fmt.Errorf("%s could not CommitBlock %v", errHeader, err)
}
p.commitNeeded = false
return nil
}