/
history_liquidity_pools.go
163 lines (137 loc) · 5.36 KB
/
history_liquidity_pools.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package history
import (
"context"
"sort"
sq "github.com/Masterminds/squirrel"
"github.com/TosinShada/stellar-core/support/db"
"github.com/TosinShada/stellar-core/support/errors"
)
// QHistoryLiquidityPools defines account related queries.
type QHistoryLiquidityPools interface {
CreateHistoryLiquidityPools(ctx context.Context, poolIDs []string, batchSize int) (map[string]int64, error)
NewOperationLiquidityPoolBatchInsertBuilder(maxBatchSize int) OperationLiquidityPoolBatchInsertBuilder
NewTransactionLiquidityPoolBatchInsertBuilder(maxBatchSize int) TransactionLiquidityPoolBatchInsertBuilder
}
// CreateHistoryLiquidityPools creates rows in the history_liquidity_pools table for a given list of ids.
// CreateHistoryLiquidityPools returns a mapping of id to its corresponding internal id in the history_liquidity_pools table
func (q *Q) CreateHistoryLiquidityPools(ctx context.Context, poolIDs []string, batchSize int) (map[string]int64, error) {
if len(poolIDs) == 0 {
return nil, nil
}
builder := &db.BatchInsertBuilder{
Table: q.GetTable("history_liquidity_pools"),
MaxBatchSize: batchSize,
Suffix: "ON CONFLICT (liquidity_pool_id) DO NOTHING",
}
// sort before inserting to prevent deadlocks on acquiring a ShareLock
// https://github.com/TosinShada/stellar-core/issues/2370
sort.Strings(poolIDs)
var deduped []string
for i, id := range poolIDs {
if i > 0 && id == poolIDs[i-1] {
// skip duplicates
continue
}
deduped = append(deduped, id)
err := builder.Row(ctx, map[string]interface{}{
"liquidity_pool_id": id,
})
if err != nil {
return nil, errors.Wrap(err, "could not insert history_liquidity_pools row")
}
}
err := builder.Exec(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not exec claimable balance insert builder")
}
var lps []HistoryLiquidityPool
toInternalID := map[string]int64{}
const selectBatchSize = 10000
for i := 0; i < len(deduped); i += selectBatchSize {
end := i + selectBatchSize
if end > len(deduped) {
end = len(deduped)
}
subset := deduped[i:end]
lps, err = q.LiquidityPoolsByIDs(ctx, subset)
if err != nil {
return nil, errors.Wrap(err, "could not select claimable balances")
}
for _, lp := range lps {
toInternalID[lp.PoolID] = lp.InternalID
}
}
return toInternalID, nil
}
// HistoryLiquidityPool is a row of data from the `history_liquidity_pools` table
type HistoryLiquidityPool struct {
PoolID string `db:"liquidity_pool_id"`
InternalID int64 `db:"id"`
}
var selectHistoryLiquidityPool = sq.Select("hlp.*").From("history_liquidity_pools hlp")
// LiquidityPoolsByIDs loads rows from `history_liquidity_pools`, by liquidity_pool_id
func (q *Q) LiquidityPoolsByIDs(ctx context.Context, poolIDs []string) (dest []HistoryLiquidityPool, err error) {
sql := selectHistoryLiquidityPool.Where(map[string]interface{}{
"hlp.liquidity_pool_id": poolIDs, // hlp.liquidity_pool_id IN (...)
})
err = q.Select(ctx, &dest, sql)
return dest, err
}
// LiquidityPoolByID loads a row from `history_liquidity_pools`, by liquidity_pool_id
func (q *Q) LiquidityPoolByID(ctx context.Context, poolID string) (dest HistoryLiquidityPool, err error) {
sql := selectHistoryLiquidityPool.Limit(1).Where("hlp.liquidity_pool_id = ?", poolID)
err = q.Get(ctx, &dest, sql)
return dest, err
}
type OperationLiquidityPoolBatchInsertBuilder interface {
Add(ctx context.Context, operationID, internalID int64) error
Exec(ctx context.Context) error
}
type operationLiquidityPoolBatchInsertBuilder struct {
builder db.BatchInsertBuilder
}
func (q *Q) NewOperationLiquidityPoolBatchInsertBuilder(maxBatchSize int) OperationLiquidityPoolBatchInsertBuilder {
return &operationLiquidityPoolBatchInsertBuilder{
builder: db.BatchInsertBuilder{
Table: q.GetTable("history_operation_liquidity_pools"),
MaxBatchSize: maxBatchSize,
},
}
}
// Add adds a new operation claimable balance to the batch
func (i *operationLiquidityPoolBatchInsertBuilder) Add(ctx context.Context, operationID, internalID int64) error {
return i.builder.Row(ctx, map[string]interface{}{
"history_operation_id": operationID,
"history_liquidity_pool_id": internalID,
})
}
// Exec flushes all pending operation claimable balances to the db
func (i *operationLiquidityPoolBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx)
}
type TransactionLiquidityPoolBatchInsertBuilder interface {
Add(ctx context.Context, transactionID, internalID int64) error
Exec(ctx context.Context) error
}
type transactionLiquidityPoolBatchInsertBuilder struct {
builder db.BatchInsertBuilder
}
func (q *Q) NewTransactionLiquidityPoolBatchInsertBuilder(maxBatchSize int) TransactionLiquidityPoolBatchInsertBuilder {
return &transactionLiquidityPoolBatchInsertBuilder{
builder: db.BatchInsertBuilder{
Table: q.GetTable("history_transaction_liquidity_pools"),
MaxBatchSize: maxBatchSize,
},
}
}
// Add adds a new transaction claimable balance to the batch
func (i *transactionLiquidityPoolBatchInsertBuilder) Add(ctx context.Context, transactionID, internalID int64) error {
return i.builder.Row(ctx, map[string]interface{}{
"history_transaction_id": transactionID,
"history_liquidity_pool_id": internalID,
})
}
// Exec flushes all pending transaction claimable balances to the db
func (i *transactionLiquidityPoolBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx)
}