-
-
Notifications
You must be signed in to change notification settings - Fork 277
/
reward.go
199 lines (164 loc) · 5.16 KB
/
reward.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
package service
import (
"context"
"fmt"
"strconv"
"strings"
"time"
sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
exchange2 "github.com/c9s/bbgo/pkg/exchange"
"github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
// RewardService collects the reward records from the exchange,
// currently it's only available for MAX exchange.
// TODO: add summary query for calculating the reward amounts
// CREATE VIEW reward_summary_by_years AS SELECT YEAR(created_at) as year, reward_type, currency, SUM(quantity) FROM rewards WHERE reward_type != 'airdrop' GROUP BY YEAR(created_at), reward_type, currency ORDER BY year DESC;
type RewardService struct {
DB *sqlx.DB
}
func (s *RewardService) Sync(ctx context.Context, exchange types.Exchange, startTime time.Time) error {
api, ok := exchange.(types.ExchangeRewardService)
if !ok {
return ErrExchangeRewardServiceNotImplemented
}
isMargin, isFutures, _, _ := exchange2.GetSessionAttributes(exchange)
if isMargin || isFutures {
return nil
}
tasks := []SyncTask{
{
Type: types.Reward{},
Select: SelectLastRewards(exchange.Name(), 100),
BatchQuery: func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) {
query := &batch.RewardBatchQuery{
Service: api,
}
return query.Query(ctx, startTime, endTime)
},
Time: func(obj interface{}) time.Time {
return obj.(types.Reward).CreatedAt.Time()
},
ID: func(obj interface{}) string {
reward := obj.(types.Reward)
return string(reward.Type) + "_" + reward.UUID
},
LogInsert: true,
},
}
for _, sel := range tasks {
if err := sel.execute(ctx, s.DB, startTime); err != nil {
return err
}
}
return nil
}
type CurrencyPositionMap map[string]fixedpoint.Value
func (s *RewardService) AggregateUnspentCurrencyPosition(ctx context.Context, ex types.ExchangeName, since time.Time) (CurrencyPositionMap, error) {
m := make(CurrencyPositionMap)
rewards, err := s.QueryUnspentSince(ctx, ex, since)
if err != nil {
return nil, err
}
for _, reward := range rewards {
m[reward.Currency] = m[reward.Currency].Add(reward.Quantity)
}
return m, nil
}
func (s *RewardService) QueryUnspentSince(ctx context.Context, ex types.ExchangeName, since time.Time, rewardTypes ...types.RewardType) ([]types.Reward, error) {
sql := "SELECT * FROM rewards WHERE created_at >= :since AND exchange = :exchange AND spent IS FALSE "
if len(rewardTypes) == 0 {
sql += " AND `reward_type` NOT IN ('airdrop') "
} else {
var args []string
for _, n := range rewardTypes {
args = append(args, strconv.Quote(string(n)))
}
sql += " AND `reward_type` IN (" + strings.Join(args, ", ") + ") "
}
sql += " ORDER BY created_at ASC"
rows, err := s.DB.NamedQueryContext(ctx, sql, map[string]interface{}{
"exchange": ex,
"since": since,
})
if err != nil {
return nil, err
}
defer rows.Close()
return s.scanRows(rows)
}
func (s *RewardService) QueryUnspent(ctx context.Context, ex types.ExchangeName, rewardTypes ...types.RewardType) ([]types.Reward, error) {
sql := "SELECT * FROM rewards WHERE exchange = :exchange AND spent IS FALSE "
if len(rewardTypes) == 0 {
sql += " AND `reward_type` NOT IN ('airdrop') "
} else {
var args []string
for _, n := range rewardTypes {
args = append(args, strconv.Quote(string(n)))
}
sql += " AND `reward_type` IN (" + strings.Join(args, ", ") + ") "
}
sql += " ORDER BY created_at ASC"
rows, err := s.DB.NamedQueryContext(ctx, sql, map[string]interface{}{
"exchange": ex,
})
if err != nil {
return nil, err
}
defer rows.Close()
return s.scanRows(rows)
}
func (s *RewardService) MarkCurrencyAsSpent(ctx context.Context, currency string) error {
result, err := s.DB.NamedExecContext(ctx, "UPDATE `rewards` SET `spent` = TRUE WHERE `currency` = :currency AND `spent` IS FALSE", map[string]interface{}{
"currency": currency,
})
if err != nil {
return err
}
_, err = result.RowsAffected()
return err
}
func (s *RewardService) MarkAsSpent(ctx context.Context, uuid string) error {
result, err := s.DB.NamedExecContext(ctx, "UPDATE `rewards` SET `spent` = TRUE WHERE `uuid` = :uuid", map[string]interface{}{
"uuid": uuid,
})
if err != nil {
return err
}
cnt, err := result.RowsAffected()
if err != nil {
return err
}
if cnt == 0 {
return fmt.Errorf("reward uuid:%s not found", uuid)
}
return nil
}
func (s *RewardService) scanRows(rows *sqlx.Rows) (rewards []types.Reward, err error) {
for rows.Next() {
var reward types.Reward
if err := rows.StructScan(&reward); err != nil {
return rewards, err
}
rewards = append(rewards, reward)
}
return rewards, rows.Err()
}
func (s *RewardService) Insert(reward types.Reward) error {
_, err := s.DB.NamedExec(`
INSERT INTO rewards (exchange, uuid, reward_type, currency, quantity, state, note, created_at)
VALUES (:exchange, :uuid, :reward_type, :currency, :quantity, :state, :note, :created_at)`,
reward)
return err
}
func SelectLastRewards(ex types.ExchangeName, limit uint64) sq.SelectBuilder {
return sq.Select("*").
From("rewards").
Where(sq.And{
sq.Eq{"exchange": ex},
}).
OrderBy("created_at DESC").
Limit(limit)
}