/
c_event_callbacks.go
208 lines (176 loc) · 5.66 KB
/
c_event_callbacks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
package gossip
import (
"errors"
"math/big"
"sync/atomic"
"github.com/TechPay-io/sirius-base/gossip/dagprocessor"
"github.com/TechPay-io/sirius-base/hash"
"github.com/TechPay-io/sirius-base/inter/dag"
"github.com/ethereum/go-ethereum/common"
"github.com/TechPay-io/go-photon/eventcheck"
"github.com/TechPay-io/go-photon/eventcheck/epochcheck"
"github.com/TechPay-io/go-photon/gossip/blockproc"
"github.com/TechPay-io/go-photon/gossip/emitter"
"github.com/TechPay-io/go-photon/inter"
"github.com/TechPay-io/go-photon/utils/concurrent"
)
var (
errStopped = errors.New("service is stopped")
errWrongMedianTime = errors.New("wrong event median time")
errWrongEpochHash = errors.New("wrong event epoch hash")
)
func (s *Service) buildEvent(e *inter.MutableEventPayload, onIndexed func()) error {
// set some unique ID
e.SetID(s.uniqueEventIDs.sample())
// node version
if e.Seq() <= 1 && len(s.config.Emitter.VersionToPublish) > 0 {
version := []byte("v-" + s.config.Emitter.VersionToPublish)
if uint32(len(version)) <= s.store.GetRules().Dag.MaxExtraData {
e.SetExtra(version)
}
}
// set PrevEpochHash
if e.Lamport() <= 1 {
prevEpochHash := s.store.GetEpochState().Hash()
e.SetPrevEpochHash(&prevEpochHash)
}
// indexing event without saving
defer s.dagIndexer.DropNotFlushed()
err := s.dagIndexer.Add(e)
if err != nil {
return err
}
if onIndexed != nil {
onIndexed()
}
e.SetMedianTime(s.dagIndexer.MedianTime(e.ID(), s.store.GetEpochState().EpochStart))
// calc initial GasPower
e.SetGasPowerUsed(epochcheck.CalcGasPowerUsed(e, s.store.GetRules()))
var selfParent *inter.Event
if e.SelfParent() != nil {
selfParent = s.store.GetEvent(*e.SelfParent())
}
availableGasPower, err := s.checkers.Gaspowercheck.CalcGasPower(e, selfParent)
if err != nil {
return err
}
if e.GasPowerUsed() > availableGasPower.Min() {
return emitter.ErrNotEnoughGasPower
}
e.SetGasPowerLeft(availableGasPower.Sub(e.GasPowerUsed()))
return s.engine.Build(e)
}
// processSavedEvent performs processing which depends on event being saved in DB
func (s *Service) processSavedEvent(e *inter.EventPayload, es *blockproc.EpochState) error {
err := s.dagIndexer.Add(e)
if err != nil {
return err
}
// check median time
if e.MedianTime() != s.dagIndexer.MedianTime(e.ID(), es.EpochStart) {
return errWrongMedianTime
}
// aBFT processing
return s.engine.Process(e)
}
// saveAndProcessEvent deletes event in a case if it fails validation during event processing
func (s *Service) saveAndProcessEvent(e *inter.EventPayload, es *blockproc.EpochState) error {
fixEventTxHashes(e)
// indexing event
s.store.SetEvent(e)
defer s.dagIndexer.DropNotFlushed()
err := s.processSavedEvent(e, es)
if err != nil {
s.store.DelEvent(e.ID())
return err
}
// save event index after success
s.dagIndexer.Flush()
return nil
}
func processEventHeads(heads *concurrent.EventsSet, e *inter.EventPayload) *concurrent.EventsSet {
// track events with no descendants, i.e. "heads"
heads.Lock()
defer heads.Unlock()
heads.Val.Erase(e.Parents()...)
heads.Val.Add(e.ID())
return heads
}
func processLastEvent(lasts *concurrent.ValidatorEventsSet, e *inter.EventPayload) *concurrent.ValidatorEventsSet {
// set validator's last event. we don't care about forks, because this index is used only for emitter
lasts.Lock()
defer lasts.Unlock()
lasts.Val[e.Creator()] = e.ID()
return lasts
}
// processEvent extends the engine.Process with gossip-specific actions on each event processing
func (s *Service) processEvent(e *inter.EventPayload) error {
// s.engineMu is locked here
if s.stopped {
return errStopped
}
atomic.StoreUint32(&s.eventBusyFlag, 1)
defer atomic.StoreUint32(&s.eventBusyFlag, 0)
// repeat the checks under the mutex which may depend on volatile data
if s.store.HasEvent(e.ID()) {
return eventcheck.ErrAlreadyConnectedEvent
}
if err := s.checkers.Epochcheck.Validate(e); err != nil {
return err
}
oldEpoch := s.store.GetEpoch()
es := s.store.GetEpochState()
// check prev epoch hash
if e.PrevEpochHash() != nil {
if *e.PrevEpochHash() != es.Hash() {
s.store.DelEvent(e.ID())
return errWrongEpochHash
}
}
err := s.saveAndProcessEvent(e, &es)
if err != nil {
return err
}
newEpoch := s.store.GetEpoch()
// index DAG heads and last events
s.store.SetHeads(oldEpoch, processEventHeads(s.store.GetHeads(oldEpoch), e))
s.store.SetLastEvents(oldEpoch, processLastEvent(s.store.GetLastEvents(oldEpoch), e))
// update highest Lamport
if newEpoch != oldEpoch {
s.store.SetHighestLamport(0)
} else if e.Lamport() > s.store.GetHighestLamport() {
s.store.SetHighestLamport(e.Lamport())
}
s.emitter.OnEventConnected(e)
if newEpoch != oldEpoch {
// reset dag indexer
s.store.resetEpochStore(newEpoch)
es := s.store.getEpochStore(newEpoch)
s.dagIndexer.Reset(s.store.GetValidators(), es.table.DagIndex, func(id hash.Event) dag.Event {
return s.store.GetEvent(id)
})
// notify event checkers about new validation data
s.gasPowerCheckReader.Ctx.Store(NewGasPowerContext(s.store, s.store.GetValidators(), newEpoch, s.store.GetRules().Economy)) // read gaspower check data from disk
s.heavyCheckReader.Addrs.Store(NewEpochPubKeys(s.store, newEpoch))
// notify about new epoch
s.emitter.OnNewEpoch(s.store.GetValidators(), newEpoch)
s.feed.newEpoch.Send(newEpoch)
}
if s.store.IsCommitNeeded(newEpoch != oldEpoch) {
s.blockProcWg.Wait()
return s.store.Commit()
}
return nil
}
type uniqueID struct {
counter *big.Int
}
func (u *uniqueID) sample() [24]byte {
u.counter.Add(u.counter, common.Big1)
var id [24]byte
copy(id[:], u.counter.Bytes())
return id
}
func (s *Service) DagProcessor() *dagprocessor.Processor {
return s.pm.processor
}