/
queue_parser_blocks.go
86 lines (74 loc) · 2.86 KB
/
queue_parser_blocks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/*---------------------------------------------------------------------------------------------
* Copyright (c) IBAX. All rights reserved.
* See LICENSE in the project root for license information.
*--------------------------------------------------------------------------------------------*/
package daemons
import (
"context"
"fmt"
"sync/atomic"
"github.com/IBAX-io/go-ibax/packages/conf"
"github.com/IBAX-io/go-ibax/packages/conf/syspar"
"github.com/IBAX-io/go-ibax/packages/consts"
"github.com/IBAX-io/go-ibax/packages/storage/sqldb"
"github.com/IBAX-io/go-ibax/packages/utils"
log "github.com/sirupsen/logrus"
)
/* Take the block from the queue. If this block has the bigger block id than the last block from our chain, then find the fork
* If fork begins less then variables->rollback_blocks blocks ago, than
* - get the whole chain of blocks
* - roll back data from our blocks
* - insert the frontal data from a new chain
* - if there is no error, then roll back our data from the blocks
* - and insert new data
* - if there are errors, then roll back to the former data
* */
// QueueParserBlocks parses and applies blocks from the queue
func QueueParserBlocks(ctx context.Context, d *daemon) error {
if atomic.CompareAndSwapUint32(&d.atomic, 0, 1) {
defer atomic.StoreUint32(&d.atomic, 0)
} else {
return nil
}
DBLock()
defer DBUnlock()
infoBlock := &sqldb.InfoBlock{}
_, err := infoBlock.Get()
if err != nil {
d.logger.WithFields(log.Fields{"type": consts.DBError, "error": err}).Error("getting info block")
return err
}
queueBlock := &sqldb.QueueBlock{}
_, err = queueBlock.Get()
if err != nil {
d.logger.WithFields(log.Fields{"type": consts.DBError, "error": err}).Error("getting queue block")
return err
}
if len(queueBlock.Hash) == 0 {
d.logger.WithFields(log.Fields{"type": consts.NotFound}).Debug("queue block not found")
return err
}
// check if the block gets in the rollback_blocks_1 limit
if queueBlock.BlockID > infoBlock.BlockID+syspar.GetRbBlocks1() {
queueBlock.DeleteOldBlocks()
return utils.ErrInfo("rollback_blocks_1")
}
// is it old block in queue ?
if queueBlock.BlockID <= infoBlock.BlockID {
queueBlock.DeleteOldBlocks()
return utils.ErrInfo(fmt.Errorf("old block %d <= %d", queueBlock.BlockID, infoBlock.BlockID))
}
if queueBlock.HonorNodeID == conf.Config.KeyID {
d.logger.WithFields(log.Fields{"type": consts.DuplicateObject}).Debug("queueBlock generated by myself", queueBlock.BlockID)
return utils.ErrInfo(fmt.Errorf("queueBlock generated by myself: %d", queueBlock.BlockID))
}
nodeHost, err := syspar.GetNodeHostByPosition(queueBlock.HonorNodeID)
if err != nil {
queueBlock.DeleteQueueBlockByHash()
return utils.ErrInfo(err)
}
blockID := queueBlock.BlockID
host := utils.GetHostPort(nodeHost)
// update our chain till maxBlockID from the host
return UpdateChain(ctx, d, host, blockID)
}