-
Notifications
You must be signed in to change notification settings - Fork 0
/
protocol.go
112 lines (95 loc) · 3.06 KB
/
protocol.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// protocol.go provides primitives for defining client protocol behavior
package clientcore
import (
"context"
"math"
"math/rand"
"sync"
"github.com/getlantern/broflake/common"
)
const (
workerBufferSz = 4096
)
// WorkerFSM implements a Mealy machine: https://en.wikipedia.org/wiki/Mealy_machine
// A WorkerFSM independently manages the lifetime of a single connection slot. A client maintains
// two pools of WorkerFSMs - one for upstream channels and one for downstream channels.
type WorkerFSM struct {
com *ipcChan
currentState int
nextInput []interface{}
state []FSMstate
ctx context.Context
cancel context.CancelFunc
wg *sync.WaitGroup
}
// Construct a new WorkerFSM
func NewWorkerFSM(wg *sync.WaitGroup, states []FSMstate) *WorkerFSM {
fsm := WorkerFSM{
com: newIpcChan(workerBufferSz),
state: states,
wg: wg,
}
return &fsm
}
// Start this WorkerFSM
func (fsm *WorkerFSM) Start() {
if fsm.wg != nil {
fsm.wg.Add(1)
}
go func() {
defer func() {
if fsm.wg != nil {
fsm.wg.Done()
}
}()
common.Debug("Starting WorkerFSM...")
fsm.ctx, fsm.cancel = context.WithCancel(context.Background())
for {
select {
case <-fsm.ctx.Done():
common.Debug("End of last state, stopping WorkerFSM...")
return
default:
fsm.currentState, fsm.nextInput = fsm.state[fsm.currentState](fsm.ctx, fsm.com, fsm.nextInput)
}
}
}()
}
// Stop this WorkerFSM (takes effect upon returning from the currently executing state)
func (fsm *WorkerFSM) Stop() {
fsm.cancel()
}
// FSMstate encapsulates logic for one state in a WorkerFSM. An FSMstate must return an int
// corresponding to the next state and a list of any inputs to propagate to the next state.
// TODO: a state's number simply corresponds to its index in WorkerFSM.state, but we perform no
// sanity checking of state indices.
type FSMstate func(ctx context.Context, com *ipcChan, input []interface{}) (int, []interface{})
// STUNCache implements the operations which support our strategy for evading STUN server blocking
// in-country. That is: populate the cache with the largest set of currently known STUN servers and
// shuffle it; select a cohort of the first N servers in the list to use in parallel; if any of
// those servers work, continue using that cohort; if all of those servers fail, delete the cohort
// from the list; when the list is empty, repeat the steps.
type STUNCache struct {
data []string
n float64
}
func newSTUNCache(srvs []string, n float64) STUNCache {
dest := make([]string, len(srvs))
copy(dest, srvs)
rand.Shuffle(len(dest), func(i, j int) {
dest[i], dest[j] = dest[j], dest[i]
})
return STUNCache{data: dest, n: n}
}
// Return the size of the STUNCache
func (s *STUNCache) size() int {
return len(s.data)
}
// Get the current cohort of servers in the STUNCache
func (s *STUNCache) cohort() []string {
return s.data[:int(math.Min(s.n, float64(len(s.data))))]
}
// Delete the current cohort of servers from the STUNCache
func (s *STUNCache) drop() {
s.data = s.data[int(math.Min(s.n, float64(len(s.data)))):]
}