-
Notifications
You must be signed in to change notification settings - Fork 35
/
worker.go
130 lines (108 loc) · 2.39 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package server
import (
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/fatedier/fft/pkg/log"
"github.com/fatedier/fft/pkg/msg"
)
var (
ErrPublicAddr = errors.New("no public address")
)
type Worker struct {
conn net.Conn
port int64
advicePublicIP string
publicAddr string
}
func NewWorker(port int64, advicePublicIP string, conn net.Conn) *Worker {
return &Worker{
port: port,
advicePublicIP: advicePublicIP,
conn: conn,
}
}
func (w *Worker) PublicAddr() string {
return w.publicAddr
}
func (w *Worker) DetectPublicAddr() error {
host, _, err := net.SplitHostPort(w.conn.RemoteAddr().String())
if err != nil {
return fmt.Errorf("parse worker address error: %v", err)
}
ip := w.advicePublicIP
if ip == "" {
ip = host
}
detectAddr := net.JoinHostPort(ip, fmt.Sprintf("%d", w.port))
log.Debug("worker detect address: %s", detectAddr)
detectConn, err := net.Dial("tcp", detectAddr)
if err != nil {
log.Warn("dial worker public address error: %v", err)
return ErrPublicAddr
}
msg.WriteMsg(detectConn, &msg.Ping{})
detectConn.SetReadDeadline(time.Now().Add(5 * time.Second))
m, err := msg.ReadMsg(detectConn)
if err != nil {
log.Warn("read pong from detectConn error: %v", err)
return ErrPublicAddr
}
if _, ok := m.(*msg.Pong); !ok {
return ErrPublicAddr
}
detectConn.Close()
w.publicAddr = detectAddr
return nil
}
func (w *Worker) RunKeepAlive(closeCallback func()) {
defer func() {
if closeCallback != nil {
closeCallback()
}
}()
for {
w.conn.SetReadDeadline(time.Now().Add(30 * time.Second))
m, err := msg.ReadMsg(w.conn)
if err != nil {
w.conn.Close()
return
}
if _, ok := m.(*msg.Ping); !ok {
w.conn.Close()
return
}
msg.WriteMsg(w.conn, &msg.Pong{})
}
}
type WorkerGroup struct {
workers map[string]*Worker
mu sync.RWMutex
}
func NewWorkerGroup() *WorkerGroup {
return &WorkerGroup{
workers: make(map[string]*Worker),
}
}
func (wg *WorkerGroup) RegisterWorker(w *Worker) {
closeCallback := func() {
wg.mu.Lock()
delete(wg.workers, w.PublicAddr())
wg.mu.Unlock()
}
wg.mu.Lock()
wg.workers[w.PublicAddr()] = w
go w.RunKeepAlive(closeCallback)
wg.mu.Unlock()
}
func (wg *WorkerGroup) GetAvailableWorkerAddrs() []string {
addrs := make([]string, 0)
wg.mu.RLock()
defer wg.mu.RUnlock()
for addr, _ := range wg.workers {
addrs = append(addrs, addr)
}
return addrs
}