/
accounts_processor.go
96 lines (79 loc) · 2.14 KB
/
accounts_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package processors
import (
"github.com/AnneNamuli/go-stellar/ingest"
"github.com/AnneNamuli/go-stellar/services/horizon/internal/db2/history"
"github.com/AnneNamuli/go-stellar/support/errors"
"github.com/AnneNamuli/go-stellar/xdr"
)
type AccountsProcessor struct {
accountsQ history.QAccounts
cache *ingest.LedgerEntryChangeCache
}
func NewAccountsProcessor(accountsQ history.QAccounts) *AccountsProcessor {
p := &AccountsProcessor{accountsQ: accountsQ}
p.reset()
return p
}
func (p *AccountsProcessor) reset() {
p.cache = ingest.NewLedgerEntryChangeCache()
}
func (p *AccountsProcessor) ProcessChange(change ingest.Change) error {
if change.Type != xdr.LedgerEntryTypeAccount {
return nil
}
err := p.cache.AddChange(change)
if err != nil {
return errors.Wrap(err, "error adding to ledgerCache")
}
if p.cache.Size() > maxBatchSize {
err = p.Commit()
if err != nil {
return errors.Wrap(err, "error in Commit")
}
p.reset()
}
return nil
}
func (p *AccountsProcessor) Commit() error {
batchUpsertAccounts := []xdr.LedgerEntry{}
changes := p.cache.GetChanges()
for _, change := range changes {
changed, err := change.AccountChangedExceptSigners()
if err != nil {
return errors.Wrap(err, "Error running change.AccountChangedExceptSigners")
}
if !changed {
continue
}
switch {
case change.Post != nil:
// Created and updated
batchUpsertAccounts = append(batchUpsertAccounts, *change.Post)
case change.Pre != nil && change.Post == nil:
// Removed
account := change.Pre.Data.MustAccount()
accountID := account.AccountId.Address()
rowsAffected, err := p.accountsQ.RemoveAccount(accountID)
if err != nil {
return err
}
if rowsAffected != 1 {
return ingest.NewStateError(errors.Errorf(
"%d No rows affected when removing account %s",
rowsAffected,
accountID,
))
}
default:
return errors.New("Invalid io.Change: change.Pre == nil && change.Post == nil")
}
}
// Upsert accounts
if len(batchUpsertAccounts) > 0 {
err := p.accountsQ.UpsertAccounts(batchUpsertAccounts)
if err != nil {
return errors.Wrap(err, "errors in UpsertAccounts")
}
}
return nil
}