/
listen.go
183 lines (153 loc) · 3.76 KB
/
listen.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package p2p
import (
"bytes"
"encoding/binary"
"fmt"
"net"
"time"
log "github.com/cihub/seelog"
"github.com/xtfly/gokits"
"library/p2p/common"
)
// PeerConn wraps an incoming network connection and contains metadata that helps
// identify which active p2pSession it's relevant for.
type PeerConn struct {
conn net.Conn
client bool // 对端是否为客户端
remoteAddr net.Addr
taskID string
}
// StartListen listens on a TCP port for incoming connections and
// demuxes them to the appropriate active p2pSession based on the taskId
// in the header.
func StartListen(cfg *common.Config) (conChan chan *PeerConn, listener net.Listener, err error) {
listener, err = CreateListener(cfg)
if err != nil {
return
}
conChan = make(chan *PeerConn)
go func(cfg *common.Config, conChan chan *PeerConn) {
var tempDelay time.Duration
for {
conn, e := listener.Accept()
if e != nil {
if ne, ok := e.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
log.Infof("Accept error: %v; retrying in %v", e, tempDelay)
time.Sleep(tempDelay)
continue
}
return
}
tempDelay = 0
h, err := readPHeader(conn)
if err != nil {
log.Error("Error reading header: ", err)
continue
}
if err := h.validate(cfg); err != nil {
log.Error("header auth failed:", err)
continue
}
conChan <- &PeerConn{
conn: conn,
client: true,
remoteAddr: conn.RemoteAddr(),
taskID: h.TaskID,
}
}
}(cfg, conChan)
return
}
// CreateListener ...
func CreateListener(cfg *common.Config) (listener net.Listener, err error) {
listener, err = net.ListenTCP("tcp",
&net.TCPAddr{
IP: net.ParseIP(cfg.Net.IP),
Port: cfg.Net.DataPort,
})
if err != nil {
log.Error("Listen failed:", err)
return
}
log.Infof("Listening for peers on %s:%v", cfg.Net.IP, cfg.Net.DataPort)
return
}
// reading header info
func readPHeader(conn net.Conn) (h *PHeader, err error) {
h = &PHeader{}
var bslen int32
err = binary.Read(conn, binary.BigEndian, &bslen)
if err != nil {
err = fmt.Errorf("Read length error: %v", err)
return
}
if bslen <= 0 || bslen > 200 {
err = fmt.Errorf("read length is invalid: %v", bslen)
return
}
bs := make([]byte, bslen)
_, err = conn.Read(bs)
if err != nil {
err = fmt.Errorf("Couldn't read auth info: %v", err)
return
}
h.Len = bslen
buf := bytes.NewBuffer(bs)
if h.TaskID, err = readString(buf); err != nil {
return
}
if h.Username, err = readString(buf); err != nil {
return
}
if h.Password, err = readString(buf); err != nil {
return
}
if h.Salt, err = readString(buf); err != nil {
return
}
return
}
func readString(buf *bytes.Buffer) (str string, err error) {
if str, err = buf.ReadString(byte(0x00)); err != nil {
err = fmt.Errorf("Read string error: %v", err)
return
}
str = str[:len(str)-1]
return
}
func writePHeader(conn net.Conn, taskID string, cfg *common.Config) (err error) {
pwd, salt := gokits.GenPasswd(cfg.Auth.Password, 8)
all := [][]byte{[]byte(taskID),
[]byte(cfg.Auth.Username),
[]byte(pwd),
[]byte(salt)}
buf := bytes.NewBuffer(make([]byte, 0))
blen := 0
for _, v := range all {
blen += len(v) + 1
}
binary.Write(buf, binary.BigEndian, int32(blen))
for _, v := range all {
buf.Write(v)
buf.WriteByte(0)
}
_, err = conn.Write(buf.Bytes())
return
}
func (h *PHeader) validate(cfg *common.Config) error {
if h.Username != cfg.Auth.Username {
return fmt.Errorf("username or password is incorrect")
}
if !gokits.CmpPasswd(cfg.Auth.Password, h.Salt, h.Password) {
return fmt.Errorf("username or password is incorrect")
}
return nil
}