-
Notifications
You must be signed in to change notification settings - Fork 0
/
ui.go
141 lines (111 loc) · 3 KB
/
ui.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// ui.go defines a standard interface for UI status bindings across build platforms
package clientcore
import (
"net"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/getlantern/broflake/common"
netstatecl "github.com/getlantern/broflake/netstate/client"
)
const (
uiRefreshHz = 4
)
// XXX: This structure is used to maintain cumulative state for the identity of currently connected
// consumers, and it exists only for the purpose of reporting network graph data to netstated
type safeConsumerMap struct {
mu sync.RWMutex
v map[workerID]common.ConsumerInfo
}
func (c *safeConsumerMap) set(wid workerID, ci common.ConsumerInfo) {
c.mu.Lock()
defer c.mu.Unlock()
c.v[wid] = ci
}
func (c *safeConsumerMap) get(wid workerID) (ci common.ConsumerInfo, ok bool) {
c.mu.RLock()
defer c.mu.RUnlock()
ci, ok = c.v[wid]
return ci, ok
}
// Return only the currently connected consumers as a slice of 3-tuples: [IP addr, tag, workerIdx]
func (c *safeConsumerMap) slice() [][]string {
var s [][]string
c.mu.RLock()
defer c.mu.RUnlock()
for wid, cinfo := range c.v {
if !cinfo.Nil() {
s = append(s, []string{cinfo.Addr.String(), cinfo.Tag, strconv.Itoa(int(wid))})
}
}
return s
}
var connectedConsumers = safeConsumerMap{v: make(map[workerID]common.ConsumerInfo)}
type UI interface {
Init(bf *BroflakeEngine)
Start()
Stop()
Debug()
OnReady()
OnStartup()
OnDownstreamChunk(size int, workerIdx int)
OnDownstreamThroughput(bytesPerSec int)
OnConsumerConnectionChange(state int, workerIdx int, addr net.IP)
}
func DownstreamUIHandler(ui UIImpl, netstated, tag string) func(msg IPCMsg) {
var bytesPerSec int64
var tick uint
tickMs := time.Duration(1000 / uiRefreshHz)
go func() {
for {
<-time.After(tickMs * time.Millisecond)
ui.OnDownstreamThroughput(int(atomic.LoadInt64(&bytesPerSec)))
if tick%uiRefreshHz == 0 {
atomic.SwapInt64(&bytesPerSec, 0)
}
tick++
}
}()
return func(msg IPCMsg) {
switch msg.IpcType {
case ChunkIPC:
size := len(msg.Data.([]byte))
atomic.AddInt64(&bytesPerSec, int64(size))
ui.OnDownstreamChunk(size, int(msg.Wid))
}
}
}
func UpstreamUIHandler(ui UIImpl, netstated, tag string) func(msg IPCMsg) {
return func(msg IPCMsg) {
switch msg.IpcType {
case ConsumerInfoIPC:
ci := msg.Data.(common.ConsumerInfo)
// Fire a UI event for the consumer delta
state := 1
if ci.Nil() {
state = -1
}
ui.OnConsumerConnectionChange(state, int(msg.Wid), ci.Addr)
// Update our cumulative local state for all connected consumers
connectedConsumers.set(msg.Wid, ci)
if netstated != "" {
// Encode our cumulative local state as a netstate instruction
args := connectedConsumers.slice()
inst := &netstatecl.Instruction{
Op: netstatecl.OpConsumerState,
Args: netstatecl.EncodeArgsOpConsumerState(args),
Tag: tag,
}
// Send it to netstated!
err := netstatecl.Exec(
netstated,
inst,
)
if err != nil {
common.Debugf("Netstate client Exec error: %v", err)
}
}
}
}
}