forked from stellar/go
/
trust_lines_processor.go
100 lines (84 loc) · 2.36 KB
/
trust_lines_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package processors
import (
"context"
"github.com/stellar/go/ingest"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)
type TrustLinesProcessor struct {
trustLinesQ history.QTrustLines
cache *ingest.ChangeCompactor
}
func NewTrustLinesProcessor(trustLinesQ history.QTrustLines) *TrustLinesProcessor {
p := &TrustLinesProcessor{trustLinesQ: trustLinesQ}
p.reset()
return p
}
func (p *TrustLinesProcessor) reset() {
p.cache = ingest.NewChangeCompactor()
}
func (p *TrustLinesProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
if change.Type != xdr.LedgerEntryTypeTrustline {
return nil
}
err := p.cache.AddChange(change)
if err != nil {
return errors.Wrap(err, "error adding to ledgerCache")
}
if p.cache.Size() > maxBatchSize {
err = p.Commit(ctx)
if err != nil {
return errors.Wrap(err, "error in Commit")
}
p.reset()
}
return nil
}
func (p *TrustLinesProcessor) Commit(ctx context.Context) error {
batchUpsertTrustLines := []xdr.LedgerEntry{}
changes := p.cache.GetChanges()
for _, change := range changes {
var rowsAffected int64
var err error
var action string
var ledgerKey xdr.LedgerKey
switch {
case change.Post != nil:
// Created and updated
batchUpsertTrustLines = append(batchUpsertTrustLines, *change.Post)
case change.Pre != nil && change.Post == nil:
// Removed
action = "removing"
trustLine := change.Pre.Data.MustTrustLine()
err = ledgerKey.SetTrustline(trustLine.AccountId, trustLine.Asset)
if err != nil {
return errors.Wrap(err, "Error creating ledger key")
}
rowsAffected, err = p.trustLinesQ.RemoveTrustLine(ctx, *ledgerKey.TrustLine)
if err != nil {
return err
}
if rowsAffected != 1 {
return ingest.NewStateError(errors.Errorf(
"%d rows affected when %s trustline: %s %s",
rowsAffected,
action,
ledgerKey.TrustLine.AccountId.Address(),
// TODO fix before Protocol 18
ledgerKey.TrustLine.Asset.ToAsset().String(),
))
}
default:
return errors.New("Invalid io.Change: change.Pre == nil && change.Post == nil")
}
}
// Upsert accounts
if len(batchUpsertTrustLines) > 0 {
err := p.trustLinesQ.UpsertTrustLines(ctx, batchUpsertTrustLines)
if err != nil {
return errors.Wrap(err, "errors in UpsertTrustLines")
}
}
return nil
}