/
livefeed.go
68 lines (57 loc) · 2.15 KB
/
livefeed.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package spa
import (
"time"
"github.com/iotaledger/iota.go/transaction"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/workerpool"
"github.com/gohornet/hornet/packages/model/hornet"
"github.com/gohornet/hornet/packages/model/milestone_index"
tangle_model "github.com/gohornet/hornet/packages/model/tangle"
"github.com/gohornet/hornet/packages/shutdown"
"github.com/gohornet/hornet/plugins/tangle"
)
var liveFeedWorkerCount = 1
var liveFeedWorkerQueueSize = 50
var liveFeedWorkerPool *workerpool.WorkerPool
func configureLiveFeed() {
liveFeedWorkerPool = workerpool.New(func(task workerpool.Task) {
switch x := task.Param(0).(type) {
case *transaction.Transaction:
sendToAllWSClient(&msg{MsgTypeTx, &tx{x.Hash, x.Value}})
case milestone_index.MilestoneIndex:
if tailTx := getMilestone(x); tailTx != nil {
sendToAllWSClient(&msg{MsgTypeMs, &ms{tailTx.GetHash(), x}})
}
}
task.Return(nil)
}, workerpool.WorkerCount(liveFeedWorkerCount), workerpool.QueueSize(liveFeedWorkerQueueSize))
}
func runLiveFeed() {
newTxRateLimiter := time.NewTicker(time.Second / 10)
notifyNewTx := events.NewClosure(func(transaction *hornet.Transaction, firstSeenLatestMilestoneIndex milestone_index.MilestoneIndex, latestSolidMilestoneIndex milestone_index.MilestoneIndex) {
if !tangle_model.IsNodeSynced() {
return
}
select {
case <-newTxRateLimiter.C:
liveFeedWorkerPool.TrySubmit(transaction.Tx)
default:
}
})
notifyLMChanged := events.NewClosure(func(bndl *tangle_model.Bundle) {
liveFeedWorkerPool.TrySubmit(bndl.GetMilestoneIndex())
})
daemon.BackgroundWorker("SPA[TxUpdater]", func(shutdownSignal <-chan struct{}) {
tangle.Events.ReceivedNewTransaction.Attach(notifyNewTx)
tangle.Events.LatestMilestoneChanged.Attach(notifyLMChanged)
liveFeedWorkerPool.Start()
<-shutdownSignal
log.Info("Stopping SPA[TxUpdater] ...")
tangle.Events.ReceivedNewTransaction.Detach(notifyNewTx)
tangle.Events.LatestMilestoneChanged.Detach(notifyLMChanged)
newTxRateLimiter.Stop()
liveFeedWorkerPool.Stop()
log.Info("Stopping SPA[TxUpdater] ... done")
}, shutdown.ShutdownPrioritySPA)
}