-
Notifications
You must be signed in to change notification settings - Fork 22
/
manager.go
220 lines (197 loc) · 6.6 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
// Copyright (C) 2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package manager
import (
"context"
"encoding/json"
"sync"
"time"
"github.com/AnomalyFi/hypersdk/crypto/ed25519"
"github.com/AnomalyFi/hypersdk/pubsub"
"github.com/AnomalyFi/hypersdk/rpc"
"github.com/AnomalyFi/hypersdk/utils"
"github.com/AnomalyFi/nodekit-seq/actions"
"github.com/AnomalyFi/nodekit-seq/auth"
"github.com/AnomalyFi/nodekit-seq/cmd/token-feed/config"
"github.com/AnomalyFi/nodekit-seq/consts"
trpc "github.com/AnomalyFi/nodekit-seq/rpc"
tutils "github.com/AnomalyFi/nodekit-seq/utils"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/timer"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)
type FeedContent struct {
Message string `json:"message"`
URL string `json:"url"`
}
type FeedObject struct {
Address string `json:"address"`
TxID ids.ID `json:"txID"`
Timestamp int64 `json:"timestamp"`
Fee uint64 `json:"fee"`
Content *FeedContent `json:"content"`
}
type Manager struct {
log logging.Logger
config *config.Config
tcli *trpc.JSONRPCClient
l sync.RWMutex
t *timer.Timer
epochStart int64
epochMessages int
feeAmount uint64
f sync.RWMutex
// TODO: persist this
feed []*FeedObject
}
func New(logger logging.Logger, config *config.Config) (*Manager, error) {
ctx := context.TODO()
cli := rpc.NewJSONRPCClient(config.TokenRPC)
networkID, _, chainID, err := cli.Network(ctx)
if err != nil {
return nil, err
}
tcli := trpc.NewJSONRPCClient(config.TokenRPC, networkID, chainID)
m := &Manager{log: logger, config: config, tcli: tcli, feed: []*FeedObject{}}
m.epochStart = time.Now().Unix()
m.feeAmount = m.config.MinFee
m.log.Info("feed initialized",
zap.String("address", m.config.Recipient),
zap.String("fee", utils.FormatBalance(m.feeAmount, consts.Decimals)),
)
m.t = timer.NewTimer(m.updateFee)
return m, nil
}
func (m *Manager) updateFee() {
m.l.Lock()
defer m.l.Unlock()
// If time since [epochStart] is within half of the target duration,
// we attempted to update fee when we just reset during block processing.
now := time.Now().Unix()
if now-m.epochStart < m.config.TargetDurationPerEpoch/2 {
return
}
// Decrease fee if there are no messages in this epoch
if m.feeAmount > m.config.MinFee && m.epochMessages == 0 {
m.feeAmount -= m.config.FeeDelta
m.log.Info("decreasing message fee", zap.Uint64("fee", m.feeAmount))
}
m.epochMessages = 0
m.epochStart = time.Now().Unix()
m.t.SetTimeoutIn(time.Duration(m.config.TargetDurationPerEpoch) * time.Second)
}
func (m *Manager) Run(ctx context.Context) error {
// Start update timer
m.t.SetTimeoutIn(time.Duration(m.config.TargetDurationPerEpoch) * time.Second)
go m.t.Dispatch()
defer m.t.Stop()
parser, err := m.tcli.Parser(ctx)
if err != nil {
return err
}
recipientPubKey, err := m.config.RecipientPublicKey()
if err != nil {
return err
}
for ctx.Err() == nil { // handle WS client failure
scli, err := rpc.NewWebSocketClient(m.config.TokenRPC, rpc.DefaultHandshakeTimeout, pubsub.MaxPendingMessages, pubsub.MaxReadMessageSize)
if err != nil {
m.log.Warn("unable to connect to RPC", zap.String("uri", m.config.TokenRPC), zap.Error(err))
time.Sleep(10 * time.Second)
continue
}
if err := scli.RegisterBlocks(); err != nil {
m.log.Warn("unable to connect to register for blocks", zap.String("uri", m.config.TokenRPC), zap.Error(err))
time.Sleep(10 * time.Second)
continue
}
for ctx.Err() == nil {
// Listen for blocks
blk, results, _, _, err := scli.ListenBlock(ctx, parser)
if err != nil {
m.log.Warn("unable to listen for blocks", zap.Error(err))
break
}
// Look for transactions to recipient
for i, tx := range blk.Txs {
action, ok := tx.Action.(*actions.Transfer)
if !ok {
continue
}
if action.To != recipientPubKey {
continue
}
if len(action.Memo) == 0 {
continue
}
result := results[i]
from := auth.GetActor(tx.Auth)
if !result.Success {
m.log.Info("incoming message failed on-chain", zap.String("from", tutils.Address(from)), zap.String("memo", string(action.Memo)), zap.Uint64("payment", action.Value), zap.Uint64("required", m.feeAmount))
continue
}
if action.Value < m.feeAmount {
m.log.Info("incoming message did not pay enough", zap.String("from", tutils.Address(from)), zap.String("memo", string(action.Memo)), zap.Uint64("payment", action.Value), zap.Uint64("required", m.feeAmount))
continue
}
var c FeedContent
if err := json.Unmarshal(action.Memo, &c); err != nil {
m.log.Info("incoming message could not be parsed", zap.String("from", tutils.Address(from)), zap.String("memo", string(action.Memo)), zap.Uint64("payment", action.Value), zap.Error(err))
continue
}
if len(c.Message) == 0 {
m.log.Info("incoming message was empty", zap.String("from", tutils.Address(from)), zap.String("memo", string(action.Memo)), zap.Uint64("payment", action.Value))
continue
}
// TODO: pre-verify URLs
addr := tutils.Address(from)
m.l.Lock()
m.f.Lock()
m.feed = append([]*FeedObject{{
Address: addr,
TxID: tx.ID(),
Timestamp: blk.Tmstmp,
Fee: action.Value,
Content: &c,
}}, m.feed...)
if len(m.feed) > m.config.FeedSize {
// TODO: do this more efficiently using a rolling window
m.feed[m.config.FeedSize] = nil // prevent memory leak
m.feed = m.feed[:m.config.FeedSize]
}
m.epochMessages++
if m.epochMessages >= m.config.MessagesPerEpoch {
m.feeAmount += m.config.FeeDelta
m.log.Info("increasing message fee", zap.Uint64("fee", m.feeAmount))
m.epochMessages = 0
m.epochStart = time.Now().Unix()
m.t.Cancel()
m.t.SetTimeoutIn(time.Duration(m.config.TargetDurationPerEpoch) * time.Second)
}
m.log.Info("received incoming message", zap.String("from", tutils.Address(from)), zap.String("memo", string(action.Memo)), zap.Uint64("payment", action.Value), zap.Uint64("new required", m.feeAmount))
m.f.Unlock()
m.l.Unlock()
}
}
if ctx.Err() != nil {
return ctx.Err()
}
// Sleep before trying again
time.Sleep(10 * time.Second)
}
return ctx.Err()
}
func (m *Manager) GetFeedInfo(_ context.Context) (ed25519.PublicKey, uint64, error) {
m.l.RLock()
defer m.l.RUnlock()
pk, err := m.config.RecipientPublicKey()
return pk, m.feeAmount, err
}
// TODO: allow for multiple feeds
func (m *Manager) GetFeed(context.Context) ([]*FeedObject, error) {
m.f.RLock()
defer m.f.RUnlock()
return slices.Clone(m.feed), nil
}