-
Notifications
You must be signed in to change notification settings - Fork 127
/
queue.go
83 lines (75 loc) · 2.23 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package kernel
import (
"github.com/MixinNetwork/mixin/common"
"github.com/MixinNetwork/mixin/crypto"
"github.com/MixinNetwork/mixin/logger"
)
func (node *Node) QueueTransaction(tx *common.VersionedTransaction) (string, error) {
err := tx.Validate(node.persistStore)
if err != nil {
return "", err
}
err = node.persistStore.CachePutTransaction(tx)
if err != nil {
return "", err
}
err = node.QueueAppendSnapshot(node.IdForNetwork, &common.Snapshot{
Version: common.SnapshotVersion,
NodeId: node.IdForNetwork,
Transaction: tx.PayloadHash(),
}, false)
return tx.PayloadHash().String(), err
}
func (node *Node) LoadCacheToQueue() error {
return node.persistStore.CacheListTransactions(func(tx *common.VersionedTransaction) error {
return node.QueueAppendSnapshot(node.IdForNetwork, &common.Snapshot{
Version: common.SnapshotVersion,
NodeId: node.IdForNetwork,
Transaction: tx.PayloadHash(),
}, false)
})
}
func (node *Node) ConsumeQueue() error {
node.persistStore.QueuePollSnapshots(func(peerId crypto.Hash, snap *common.Snapshot) error {
m := &CosiAction{PeerId: peerId, Snapshot: snap}
if snap.Version == 0 {
m.Action = CosiActionFinalization
m.Snapshot.Hash = snap.PayloadHash()
} else if snap.Signature != nil {
m.Action = CosiActionFinalization
m.Snapshot.Hash = snap.PayloadHash()
} else if snap.NodeId != node.IdForNetwork {
m.Action = CosiActionExternalAnnouncement
m.Snapshot.Hash = snap.PayloadHash()
} else {
m.Action = CosiActionSelfEmpty
}
if m.Action != CosiActionFinalization {
node.cosiActionsChan <- m
return nil
}
tx, err := node.persistStore.CacheGetTransaction(snap.Transaction)
if err != nil {
return err
}
if tx != nil {
node.cosiActionsChan <- m
return nil
}
tx, _, err = node.persistStore.ReadTransaction(snap.Transaction)
if err != nil {
return err
}
if tx != nil {
node.cosiActionsChan <- m
return nil
}
if peerId == node.IdForNetwork {
return nil
}
logger.Debugf("ConsumeQueue finalized snapshot without transaction %s %s %s\n", peerId, snap.Hash, snap.Transaction)
node.Peer.SendTransactionRequestMessage(peerId, snap.Transaction)
return node.QueueAppendSnapshot(peerId, snap, true)
})
return nil
}