-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
eth_sub.go
71 lines (54 loc) · 1.66 KB
/
eth_sub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package gateway
import (
"context"
"sync"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types/ethtypes"
)
type EthSubHandler struct {
queued map[ethtypes.EthSubscriptionID][]ethtypes.EthSubscriptionResponse
sinks map[ethtypes.EthSubscriptionID]func(context.Context, *ethtypes.EthSubscriptionResponse) error
lk sync.Mutex
}
func NewEthSubHandler() *EthSubHandler {
return &EthSubHandler{
queued: make(map[ethtypes.EthSubscriptionID][]ethtypes.EthSubscriptionResponse),
sinks: make(map[ethtypes.EthSubscriptionID]func(context.Context, *ethtypes.EthSubscriptionResponse) error),
}
}
func (e *EthSubHandler) AddSub(ctx context.Context, id ethtypes.EthSubscriptionID, sink func(context.Context, *ethtypes.EthSubscriptionResponse) error) error {
e.lk.Lock()
defer e.lk.Unlock()
for _, p := range e.queued[id] {
p := p // copy
if err := sink(ctx, &p); err != nil {
return err
}
}
delete(e.queued, id)
e.sinks[id] = sink
return nil
}
func (e *EthSubHandler) RemoveSub(id ethtypes.EthSubscriptionID) {
e.lk.Lock()
defer e.lk.Unlock()
delete(e.sinks, id)
delete(e.queued, id)
}
func (e *EthSubHandler) EthSubscription(ctx context.Context, r jsonrpc.RawParams) error {
p, err := jsonrpc.DecodeParams[ethtypes.EthSubscriptionResponse](r)
if err != nil {
return err
}
e.lk.Lock()
sink := e.sinks[p.SubscriptionID]
if sink == nil {
e.queued[p.SubscriptionID] = append(e.queued[p.SubscriptionID], p)
e.lk.Unlock()
return nil
}
e.lk.Unlock()
return sink(ctx, &p) // todo track errors and auto-unsubscribe on rpc conn close?
}
var _ api.EthSubscriber = (*EthSubHandler)(nil)