forked from stellar/go
/
offers_processor.go
147 lines (125 loc) · 3.93 KB
/
offers_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package processors
import (
"context"
"github.com/stellar/go/ingest"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)
// The offers processor can be configured to trim the offers table
// by removing all offer rows which were marked for deletion at least 100 ledgers ago
const offerCompactionWindow = uint32(100)
type OffersProcessor struct {
offersQ history.QOffers
sequence uint32
cache *ingest.ChangeCompactor
insertBatch history.OffersBatchInsertBuilder
removeBatch []int64
}
func NewOffersProcessor(offersQ history.QOffers, sequence uint32) *OffersProcessor {
p := &OffersProcessor{offersQ: offersQ, sequence: sequence}
p.reset()
return p
}
func (p *OffersProcessor) reset() {
p.cache = ingest.NewChangeCompactor()
p.insertBatch = p.offersQ.NewOffersBatchInsertBuilder(maxBatchSize)
p.removeBatch = []int64{}
}
func (p *OffersProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
if change.Type != xdr.LedgerEntryTypeOffer {
return nil
}
if err := p.cache.AddChange(change); err != nil {
return errors.Wrap(err, "error adding to ledgerCache")
}
if p.cache.Size() > maxBatchSize {
if err := p.flushCache(ctx); err != nil {
return errors.Wrap(err, "error in Commit")
}
p.reset()
}
return nil
}
func (p *OffersProcessor) ledgerEntryToRow(entry *xdr.LedgerEntry) history.Offer {
offer := entry.Data.MustOffer()
return history.Offer{
SellerID: offer.SellerId.Address(),
OfferID: int64(offer.OfferId),
SellingAsset: offer.Selling,
BuyingAsset: offer.Buying,
Amount: int64(offer.Amount),
Pricen: int32(offer.Price.N),
Priced: int32(offer.Price.D),
Price: float64(offer.Price.N) / float64(offer.Price.D),
Flags: int32(offer.Flags),
LastModifiedLedger: uint32(entry.LastModifiedLedgerSeq),
Sponsor: ledgerEntrySponsorToNullString(*entry),
}
}
func (p *OffersProcessor) flushCache(ctx context.Context) error {
changes := p.cache.GetChanges()
for _, change := range changes {
var rowsAffected int64
var err error
var action string
var offerID xdr.Int64
switch {
case change.Pre == nil && change.Post != nil:
// Created
action = "inserting"
row := p.ledgerEntryToRow(change.Post)
err = p.insertBatch.Add(ctx, row)
rowsAffected = 1 // We don't track this when batch inserting
case change.Pre != nil && change.Post == nil:
// Removed
action = "removing"
offer := change.Pre.Data.MustOffer()
p.removeBatch = append(p.removeBatch, int64(offer.OfferId))
rowsAffected = 1 // We don't track this when batch removing
default:
// Updated
action = "updating"
offer := change.Post.Data.MustOffer()
offerID = offer.OfferId
row := p.ledgerEntryToRow(change.Post)
rowsAffected, err = p.offersQ.UpdateOffer(ctx, row)
}
if err != nil {
return err
}
if rowsAffected != 1 {
return ingest.NewStateError(errors.Errorf(
"%d rows affected when %s offer %d",
rowsAffected,
action,
offerID,
))
}
}
err := p.insertBatch.Exec(ctx)
if err != nil {
return errors.Wrap(err, "error executing batch")
}
if len(p.removeBatch) > 0 {
_, err = p.offersQ.RemoveOffers(ctx, p.removeBatch, p.sequence)
if err != nil {
return errors.Wrap(err, "error in RemoveOffers")
}
}
return nil
}
func (p *OffersProcessor) Commit(ctx context.Context) error {
if err := p.flushCache(ctx); err != nil {
return errors.Wrap(err, "error flushing cache")
}
if p.sequence > offerCompactionWindow {
// trim offers table by removing offers which were deleted before the cutoff ledger
if offerRowsRemoved, err := p.offersQ.CompactOffers(ctx, p.sequence-offerCompactionWindow); err != nil {
return errors.Wrap(err, "could not compact offers")
} else {
log.WithField("offer_rows_removed", offerRowsRemoved).Info("Trimmed offers table")
}
}
return nil
}