-
Notifications
You must be signed in to change notification settings - Fork 0
/
transaction_sent_to_upstream_event.go
103 lines (84 loc) · 2.1 KB
/
transaction_sent_to_upstream_event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package rpcfilters
import (
"errors"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
// transactionSentToUpstreamEvent represents an event that one can subscribe to
type transactionSentToUpstreamEvent struct {
sxMu sync.Mutex
sx map[int]chan common.Hash
listener chan common.Hash
quit chan struct{}
}
func newTransactionSentToUpstreamEvent() *transactionSentToUpstreamEvent {
return &transactionSentToUpstreamEvent{
sx: make(map[int]chan common.Hash),
listener: make(chan common.Hash),
}
}
func (e *transactionSentToUpstreamEvent) Start() error {
if e.quit != nil {
return errors.New("latest transaction sent to upstream event is already started")
}
e.quit = make(chan struct{})
go func() {
for {
select {
case transactionHash := <-e.listener:
if e.numberOfSubscriptions() == 0 {
continue
}
e.processTransactionSentToUpstream(transactionHash)
case <-e.quit:
return
}
}
}()
return nil
}
func (e *transactionSentToUpstreamEvent) numberOfSubscriptions() int {
e.sxMu.Lock()
defer e.sxMu.Unlock()
return len(e.sx)
}
func (e *transactionSentToUpstreamEvent) processTransactionSentToUpstream(transactionHash common.Hash) {
e.sxMu.Lock()
defer e.sxMu.Unlock()
for id, channel := range e.sx {
select {
case channel <- transactionHash:
default:
log.Error("dropping messages %s for subscriotion %d because the channel is full", transactionHash, id)
}
}
}
func (e *transactionSentToUpstreamEvent) Stop() {
if e.quit == nil {
return
}
select {
case <-e.quit:
return
default:
close(e.quit)
}
}
func (e *transactionSentToUpstreamEvent) Subscribe() (int, chan common.Hash) {
e.sxMu.Lock()
defer e.sxMu.Unlock()
channel := make(chan common.Hash, 512)
id := len(e.sx)
e.sx[id] = channel
return id, channel
}
func (e *transactionSentToUpstreamEvent) Unsubscribe(id int) {
e.sxMu.Lock()
defer e.sxMu.Unlock()
delete(e.sx, id)
}
// Trigger gets called in order to trigger the event
func (e *transactionSentToUpstreamEvent) Trigger(transactionHash common.Hash) {
e.listener <- transactionHash
}