-
Notifications
You must be signed in to change notification settings - Fork 0
/
ledger.go
429 lines (375 loc) · 15.7 KB
/
ledger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package ledger
import (
"bytes"
"errors"
"fmt"
"reflect"
"sync"
"github.com/golang/protobuf/proto"
"github.com/op/go-logging"
"github.com/openblockchain/obc-peer/events/producer"
"github.com/openblockchain/obc-peer/openchain/db"
"github.com/openblockchain/obc-peer/openchain/ledger/statemgmt"
"github.com/openblockchain/obc-peer/openchain/ledger/statemgmt/state"
"github.com/tecbot/gorocksdb"
"github.com/openblockchain/obc-peer/protos"
"golang.org/x/net/context"
)
var ledgerLogger = logging.MustGetLogger("ledger")
var (
// ErrOutOfBounds is returned if a request is out of bounds
ErrOutOfBounds = errors.New("ledger: out of bounds")
// ErrResourceNotFound is returned if a resource is not found
ErrResourceNotFound = errors.New("ledger: resource not found")
)
// Ledger - the struct for openchain ledger
type Ledger struct {
blockchain *blockchain
state *state.State
currentID interface{}
}
var ledger *Ledger
var ledgerError error
var once sync.Once
// GetLedger - gives a reference to a 'singleton' ledger
func GetLedger() (*Ledger, error) {
once.Do(func() {
ledger, ledgerError = newLedger()
})
return ledger, ledgerError
}
func newLedger() (*Ledger, error) {
blockchain, err := newBlockchain()
if err != nil {
return nil, err
}
state := state.NewState()
return &Ledger{blockchain, state, nil}, nil
}
/////////////////// Transaction-batch related methods ///////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////
// BeginTxBatch - gets invoked when next round of transaction-batch execution begins
func (ledger *Ledger) BeginTxBatch(id interface{}) error {
err := ledger.checkValidIDBegin()
if err != nil {
return err
}
ledger.currentID = id
return nil
}
// GetTXBatchPreviewBlock returns a preview block that will have the same
// block.GetHash() result as the block commited to the database if
// ledger.CommitTxBatch is called with the same parameters. If the state is modified
// by a transaction between these two calls, the hash will be different. The
// preview block does not include non-hashed data such as the local timestamp.
func (ledger *Ledger) GetTXBatchPreviewBlock(id interface{},
transactions []*protos.Transaction, metadata []byte) (*protos.Block, error) {
err := ledger.checkValidIDCommitORRollback(id)
if err != nil {
return nil, err
}
stateHash, err := ledger.state.GetHash()
if err != nil {
return nil, err
}
return ledger.blockchain.buildBlock(protos.NewBlock(transactions, metadata), stateHash), nil
}
// CommitTxBatch - gets invoked when the current transaction-batch needs to be committed
// This function returns successfully iff the transactions details and state changes (that
// may have happened during execution of this transaction-batch) have been committed to permanent storage
func (ledger *Ledger) CommitTxBatch(id interface{}, transactions []*protos.Transaction, transactionResults []*protos.TransactionResult, metadata []byte) error {
err := ledger.checkValidIDCommitORRollback(id)
if err != nil {
return err
}
stateHash, err := ledger.state.GetHash()
if err != nil {
ledger.resetForNextTxGroup(false)
ledger.blockchain.blockPersistenceStatus(false)
return err
}
writeBatch := gorocksdb.NewWriteBatch()
defer writeBatch.Destroy()
block := protos.NewBlock(transactions, metadata)
block.NonHashData = &protos.NonHashData{TransactionResults: transactionResults}
newBlockNumber, err := ledger.blockchain.addPersistenceChangesForNewBlock(context.TODO(), block, stateHash, writeBatch)
if err != nil {
ledger.resetForNextTxGroup(false)
ledger.blockchain.blockPersistenceStatus(false)
return err
}
ledger.state.AddChangesForPersistence(newBlockNumber, writeBatch)
opt := gorocksdb.NewDefaultWriteOptions()
defer opt.Destroy()
dbErr := db.GetDBHandle().DB.Write(opt, writeBatch)
if dbErr != nil {
ledger.resetForNextTxGroup(false)
ledger.blockchain.blockPersistenceStatus(false)
return dbErr
}
ledger.resetForNextTxGroup(true)
ledger.blockchain.blockPersistenceStatus(true)
sendProducerBlockEvent(block)
return nil
}
// RollbackTxBatch - Descards all the state changes that may have taken place during the execution of
// current transaction-batch
func (ledger *Ledger) RollbackTxBatch(id interface{}) error {
ledgerLogger.Debug("RollbackTxBatch for id = [%s]", id)
err := ledger.checkValidIDCommitORRollback(id)
if err != nil {
return err
}
ledger.resetForNextTxGroup(false)
return nil
}
// TxBegin - Marks the begin of a new transaction in the ongoing batch
func (ledger *Ledger) TxBegin(txUUID string) {
ledger.state.TxBegin(txUUID)
}
// TxFinished - Marks the finish of the on-going transaction.
// If txSuccessful is false, the state changes made by the transaction are discarded
func (ledger *Ledger) TxFinished(txUUID string, txSuccessful bool) {
ledger.state.TxFinish(txUUID, txSuccessful)
}
/////////////////// world-state related methods /////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////
// GetTempStateHash - Computes state hash by taking into account the state changes that may have taken
// place during the execution of current transaction-batch
func (ledger *Ledger) GetTempStateHash() ([]byte, error) {
return ledger.state.GetHash()
}
// GetTempStateHashWithTxDeltaStateHashes - In addition to the state hash (as defined in method GetTempStateHash),
// this method returns a map [txUuid of Tx --> cryptoHash(stateChangesMadeByTx)]
// Only successful txs appear in this map
func (ledger *Ledger) GetTempStateHashWithTxDeltaStateHashes() ([]byte, map[string][]byte, error) {
stateHash, err := ledger.state.GetHash()
return stateHash, ledger.state.GetTxStateDeltaHash(), err
}
// GetState get state for chaincodeID and key. If committed is false, this first looks in memory
// and if missing, pulls from db. If committed is true, this pulls from the db only.
func (ledger *Ledger) GetState(chaincodeID string, key string, committed bool) ([]byte, error) {
return ledger.state.Get(chaincodeID, key, committed)
}
// GetStateRangeScanIterator returns an iterator to get all the keys (and values) between startKey and endKey
// (assuming lexical order of the keys) for a chaincodeID.
// If committed is true, the key-values are retrived only from the db. If committed is false, the results from db
// are mergerd with the results in memory (giving preference to in-memory data)
// The key-values in the returned iterator are not guaranteed to be in any specific order
func (ledger *Ledger) GetStateRangeScanIterator(chaincodeID string, startKey string, endKey string, committed bool) (statemgmt.RangeScanIterator, error) {
return ledger.state.GetRangeScanIterator(chaincodeID, startKey, endKey, committed)
}
// SetState sets state to given value for chaincodeID and key. Does not immideatly writes to DB
func (ledger *Ledger) SetState(chaincodeID string, key string, value []byte) error {
return ledger.state.Set(chaincodeID, key, value)
}
// DeleteState tracks the deletion of state for chaincodeID and key. Does not immideatly writes to DB
func (ledger *Ledger) DeleteState(chaincodeID string, key string) error {
return ledger.state.Delete(chaincodeID, key)
}
// GetStateSnapshot returns a point-in-time view of the global state for the current block. This
// should be used when transfering the state from one peer to another peer. You must call
// stateSnapshot.Release() once you are done with the snapsnot to free up resources.
func (ledger *Ledger) GetStateSnapshot() (*state.StateSnapshot, error) {
dbSnapshot := db.GetDBHandle().GetSnapshot()
blockNumber, err := fetchBlockchainSizeFromSnapshot(dbSnapshot)
if err != nil {
dbSnapshot.Release()
return nil, err
}
return ledger.state.GetSnapshot(blockNumber, dbSnapshot)
}
// GetStateDelta will return the state delta for the specified block if
// available.
func (ledger *Ledger) GetStateDelta(blockNumber uint64) (*statemgmt.StateDelta, error) {
if blockNumber >= ledger.GetBlockchainSize() {
return nil, ErrOutOfBounds
}
return ledger.state.FetchStateDeltaFromDB(blockNumber)
}
// ApplyStateDelta applies a state delta to the current state. This is an
// in memory change only. You must call ledger.CommitStateDelta to persist
// the change to the DB.
// This should only be used as part of state synchronization. State deltas
// can be retrieved from another peer though the Ledger.GetStateDelta function
// or by creating state deltas with keys retrieved from
// Ledger.GetStateSnapshot(). For an example, see TestSetRawState in
// ledger_test.go
// Note that there is no order checking in this function and it is up to
// the caller to ensure that deltas are applied in the correct order.
// For example, if you are currently at block 8 and call this function
// with a delta retrieved from Ledger.GetStateDelta(10), you would now
// be in a bad state because you did not apply the delta for block 9.
// It's possible to roll the state forwards or backwards using
// stateDelta.RollBackwards. By default, a delta retrieved for block 3 can
// be used to roll forwards from state at block 2 to state at block 3. If
// stateDelta.RollBackwards=false, the delta retrived for block 3 can be
// used to roll backwards from the state at block 3 to the state at block 2.
func (ledger *Ledger) ApplyStateDelta(id interface{}, delta *statemgmt.StateDelta) error {
err := ledger.checkValidIDBegin()
if err != nil {
return err
}
ledger.currentID = id
ledger.state.ApplyStateDelta(delta)
return nil
}
// CommitStateDelta will commit the state delta passed to ledger.ApplyStateDelta
// to the DB
func (ledger *Ledger) CommitStateDelta(id interface{}) error {
err := ledger.checkValidIDCommitORRollback(id)
if err != nil {
return err
}
defer ledger.resetForNextTxGroup(true)
return ledger.state.CommitStateDelta()
}
// RollbackStateDelta will discard the state delta passed
// to ledger.ApplyStateDelta
func (ledger *Ledger) RollbackStateDelta(id interface{}) error {
err := ledger.checkValidIDCommitORRollback(id)
if err != nil {
return err
}
ledger.resetForNextTxGroup(false)
return nil
}
// DeleteALLStateKeysAndValues deletes all keys and values from the state.
// This is generally only used during state synchronization when creating a
// new state from a snapshot.
func (ledger *Ledger) DeleteALLStateKeysAndValues() error {
return ledger.state.DeleteState()
}
/////////////////// blockchain related methods /////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////
// GetBlockchainInfo returns information about the blockchain ledger such as
// height, current block hash, and previous block hash.
func (ledger *Ledger) GetBlockchainInfo() (*protos.BlockchainInfo, error) {
return ledger.blockchain.getBlockchainInfo()
}
// GetBlockByNumber return block given the number of the block on blockchain.
// Lowest block on chain is block number zero
func (ledger *Ledger) GetBlockByNumber(blockNumber uint64) (*protos.Block, error) {
if blockNumber >= ledger.GetBlockchainSize() {
return nil, ErrOutOfBounds
}
return ledger.blockchain.getBlock(blockNumber)
}
// GetBlockchainSize returns number of blocks in blockchain
func (ledger *Ledger) GetBlockchainSize() uint64 {
return ledger.blockchain.getSize()
}
// GetTransactionByUUID return transaction by it's uuid
func (ledger *Ledger) GetTransactionByUUID(txUUID string) (*protos.Transaction, error) {
return ledger.blockchain.getTransactionByUUID(txUUID)
}
// PutRawBlock puts a raw block on the chain. This function should only be
// used for synchronization between peers.
func (ledger *Ledger) PutRawBlock(block *protos.Block, blockNumber uint64) error {
err := ledger.blockchain.persistRawBlock(block, blockNumber)
if err != nil {
return err
}
sendProducerBlockEvent(block)
return nil
}
// VerifyChain will verify the integrety of the blockchain. This is accomplished
// by ensuring that the previous block hash stored in each block matches
// the actual hash of the previous block in the chain. The return value is the
// block number of the block that contains the non-matching previous block hash.
// For example, if VerifyChain(0, 99) is called and prevous hash values stored
// in blocks 8, 32, and 42 do not match the actual hashes of respective previous
// block 42 would be the return value from this function.
// highBlock is the high block in the chain to include in verofication. If you
// wish to verify the entire chain, use ledger.GetBlockchainSize() - 1.
// lowBlock is the low block in the chain to include in verification. If
// you wish to verify the entire chain, use 0 for the genesis block.
func (ledger *Ledger) VerifyChain(highBlock, lowBlock uint64) (uint64, error) {
if highBlock >= ledger.GetBlockchainSize() {
return highBlock, ErrOutOfBounds
}
if highBlock <= lowBlock {
return lowBlock, ErrOutOfBounds
}
for i := highBlock; i > lowBlock; i-- {
currentBlock, err := ledger.GetBlockByNumber(i)
if err != nil {
return i, fmt.Errorf("Error fetching block %d.", i)
}
if currentBlock == nil {
return i, fmt.Errorf("Block %d is nil.", i)
}
previousBlock, err := ledger.GetBlockByNumber(i - 1)
if err != nil {
return i - 1, fmt.Errorf("Error fetching block %d.", i)
}
if previousBlock == nil {
return i - 1, fmt.Errorf("Block %d is nil.", i-1)
}
previousBlockHash, err := previousBlock.GetHash()
if err != nil {
return i - 1, fmt.Errorf("Error calculating block hash for block %d.", i-1)
}
if bytes.Compare(previousBlockHash, currentBlock.PreviousBlockHash) != 0 {
return i, nil
}
}
return 0, nil
}
func (ledger *Ledger) checkValidIDBegin() error {
if ledger.currentID != nil {
return fmt.Errorf("Another TxGroup [%s] already in-progress", ledger.currentID)
}
return nil
}
func (ledger *Ledger) checkValidIDCommitORRollback(id interface{}) error {
if !reflect.DeepEqual(ledger.currentID, id) {
return fmt.Errorf("Another TxGroup [%s] already in-progress", ledger.currentID)
}
return nil
}
func (ledger *Ledger) resetForNextTxGroup(txCommited bool) {
ledgerLogger.Debug("resetting ledger state for next transaction batch")
ledger.currentID = nil
ledger.state.ClearInMemoryChanges(txCommited)
}
func sendProducerBlockEvent(block *protos.Block) {
// Remove payload from deploy transactions. This is done to make block
// events more lightweight as the payload for these types of transactions
// can be very large.
blockTransactions := block.GetTransactions()
for _, transaction := range blockTransactions {
if transaction.Type == protos.Transaction_CHAINCODE_NEW {
deploymentSpec := &protos.ChaincodeDeploymentSpec{}
err := proto.Unmarshal(transaction.Payload, deploymentSpec)
if err != nil {
ledgerLogger.Error(fmt.Sprintf("Error unmarshalling deployment transaction for block event: %s", err))
continue
}
deploymentSpec.CodePackage = nil
deploymentSpecBytes, err := proto.Marshal(deploymentSpec)
if err != nil {
ledgerLogger.Error(fmt.Sprintf("Error marshalling deployment transaction for block event: %s", err))
continue
}
transaction.Payload = deploymentSpecBytes
}
}
producer.Send(producer.CreateBlockEvent(block))
}