/
history_claimable_balances.go
154 lines (128 loc) · 5.3 KB
/
history_claimable_balances.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package history
import (
"context"
"sort"
sq "github.com/Masterminds/squirrel"
"github.com/TosinShada/monorepo/support/db"
"github.com/TosinShada/monorepo/support/errors"
)
// QHistoryClaimableBalances defines account related queries.
type QHistoryClaimableBalances interface {
CreateHistoryClaimableBalances(ctx context.Context, ids []string, batchSize int) (map[string]int64, error)
NewOperationClaimableBalanceBatchInsertBuilder(maxBatchSize int) OperationClaimableBalanceBatchInsertBuilder
NewTransactionClaimableBalanceBatchInsertBuilder(maxBatchSize int) TransactionClaimableBalanceBatchInsertBuilder
}
// CreateHistoryClaimableBalances creates rows in the history_claimable_balances table for a given list of ids.
// CreateHistoryClaimableBalances returns a mapping of id to its corresponding internal id in the history_claimable_balances table
func (q *Q) CreateHistoryClaimableBalances(ctx context.Context, ids []string, batchSize int) (map[string]int64, error) {
builder := &db.BatchInsertBuilder{
Table: q.GetTable("history_claimable_balances"),
MaxBatchSize: batchSize,
Suffix: "ON CONFLICT (claimable_balance_id) DO NOTHING",
}
// sort before inserting to prevent deadlocks on acquiring a ShareLock
// https://github.com/TosinShada/monorepo/issues/2370
sort.Strings(ids)
for _, id := range ids {
err := builder.Row(ctx, map[string]interface{}{
"claimable_balance_id": id,
})
if err != nil {
return nil, errors.Wrap(err, "could not insert history_claimable_balances row")
}
}
err := builder.Exec(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not exec claimable balance insert builder")
}
var cbs []HistoryClaimableBalance
toInternalID := map[string]int64{}
const selectBatchSize = 10000
for i := 0; i < len(ids); i += selectBatchSize {
end := i + selectBatchSize
if end > len(ids) {
end = len(ids)
}
subset := ids[i:end]
cbs, err = q.ClaimableBalancesByIDs(ctx, subset)
if err != nil {
return nil, errors.Wrap(err, "could not select claimable balances")
}
for _, cb := range cbs {
toInternalID[cb.BalanceID] = cb.InternalID
}
}
return toInternalID, nil
}
// HistoryClaimableBalance is a row of data from the `history_claimable_balances` table
type HistoryClaimableBalance struct {
BalanceID string `db:"claimable_balance_id"`
InternalID int64 `db:"id"`
}
var selectHistoryClaimableBalance = sq.Select("hcb.*").From("history_claimable_balances hcb")
// ClaimableBalancesByIDs loads rows from `history_claimable_balances`, by claimable_balance_id
func (q *Q) ClaimableBalancesByIDs(ctx context.Context, ids []string) (dest []HistoryClaimableBalance, err error) {
sql := selectHistoryClaimableBalance.Where(map[string]interface{}{
"hcb.claimable_balance_id": ids, // hcb.claimable_balance_id IN (...)
})
err = q.Select(ctx, &dest, sql)
return dest, err
}
// ClaimableBalanceByID loads a row from `history_claimable_balances`, by claimable_balance_id
func (q *Q) ClaimableBalanceByID(ctx context.Context, id string) (dest HistoryClaimableBalance, err error) {
sql := selectHistoryClaimableBalance.Limit(1).Where("hcb.claimable_balance_id = ?", id)
err = q.Get(ctx, &dest, sql)
return dest, err
}
type OperationClaimableBalanceBatchInsertBuilder interface {
Add(ctx context.Context, operationID, internalID int64) error
Exec(ctx context.Context) error
}
type operationClaimableBalanceBatchInsertBuilder struct {
builder db.BatchInsertBuilder
}
func (q *Q) NewOperationClaimableBalanceBatchInsertBuilder(maxBatchSize int) OperationClaimableBalanceBatchInsertBuilder {
return &operationClaimableBalanceBatchInsertBuilder{
builder: db.BatchInsertBuilder{
Table: q.GetTable("history_operation_claimable_balances"),
MaxBatchSize: maxBatchSize,
},
}
}
// Add adds a new operation claimable balance to the batch
func (i *operationClaimableBalanceBatchInsertBuilder) Add(ctx context.Context, operationID, internalID int64) error {
return i.builder.Row(ctx, map[string]interface{}{
"history_operation_id": operationID,
"history_claimable_balance_id": internalID,
})
}
// Exec flushes all pending operation claimable balances to the db
func (i *operationClaimableBalanceBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx)
}
type TransactionClaimableBalanceBatchInsertBuilder interface {
Add(ctx context.Context, transactionID, internalID int64) error
Exec(ctx context.Context) error
}
type transactionClaimableBalanceBatchInsertBuilder struct {
builder db.BatchInsertBuilder
}
func (q *Q) NewTransactionClaimableBalanceBatchInsertBuilder(maxBatchSize int) TransactionClaimableBalanceBatchInsertBuilder {
return &transactionClaimableBalanceBatchInsertBuilder{
builder: db.BatchInsertBuilder{
Table: q.GetTable("history_transaction_claimable_balances"),
MaxBatchSize: maxBatchSize,
},
}
}
// Add adds a new transaction claimable balance to the batch
func (i *transactionClaimableBalanceBatchInsertBuilder) Add(ctx context.Context, transactionID, internalID int64) error {
return i.builder.Row(ctx, map[string]interface{}{
"history_transaction_id": transactionID,
"history_claimable_balance_id": internalID,
})
}
// Exec flushes all pending transaction claimable balances to the db
func (i *transactionClaimableBalanceBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx)
}