-
Notifications
You must be signed in to change notification settings - Fork 1
/
single_ledger_session.go
75 lines (63 loc) · 1.85 KB
/
single_ledger_session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package ingest
import (
"github.com/diamnet/go/exp/ingest/adapters"
"github.com/diamnet/go/exp/ingest/io"
"github.com/diamnet/go/support/errors"
)
var _ Session = &SingleLedgerSession{}
func (s *SingleLedgerSession) Run() error {
s.setRunningState(true)
defer s.setRunningState(false)
s.shutdown = make(chan bool)
historyAdapter := adapters.MakeHistoryArchiveAdapter(s.Archive)
var err error
sequence := s.LedgerSequence
if sequence == 0 {
sequence, err = historyAdapter.GetLatestLedgerSequence()
if err != nil {
return errors.Wrap(err, "Error getting the latest ledger sequence")
}
}
err = s.processState(historyAdapter, sequence)
if err != nil {
return errors.Wrap(err, "processState errored")
}
s.standardSession.latestSuccessfullyProcessedLedger = sequence
return nil
}
func (s *SingleLedgerSession) Resume(ledgerSequence uint32) error {
panic("Not possible to resume SingleLedgerSession")
}
func (s *SingleLedgerSession) processState(historyAdapter *adapters.HistoryArchiveAdapter, sequence uint32) error {
var tempSet io.TempSet = &io.MemoryTempSet{}
if s.TempSet != nil {
tempSet = s.TempSet
}
stateReader, err := historyAdapter.GetState(sequence, tempSet)
if err != nil {
return errors.Wrap(err, "Error getting state from history archive")
}
if s.StateReporter != nil {
s.StateReporter.OnStartState(sequence)
stateReader = reporterStateReader{stateReader, s.StateReporter}
}
errChan := s.StatePipeline.Process(stateReader)
select {
case err := <-errChan:
if err != nil {
if s.StateReporter != nil {
s.StateReporter.OnEndState(err, false)
}
return errors.Wrap(err, "State pipeline errored")
}
case <-s.shutdown:
if s.StateReporter != nil {
s.StateReporter.OnEndState(nil, true)
}
s.StatePipeline.Shutdown()
}
if s.StateReporter != nil {
s.StateReporter.OnEndState(nil, false)
}
return nil
}