forked from ehang-io/nps
-
Notifications
You must be signed in to change notification settings - Fork 3
/
base.go
99 lines (88 loc) · 2.52 KB
/
base.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package proxy
import (
"errors"
"github.com/cnlh/nps/bridge"
"github.com/cnlh/nps/lib/common"
"github.com/cnlh/nps/lib/conn"
"github.com/cnlh/nps/lib/file"
"github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
"net"
"net/http"
"sync"
)
type Service interface {
Start() error
Close() error
}
type NetBridge interface {
SendLinkInfo(clientId int, link *conn.Link, t *file.Tunnel) (target net.Conn, err error)
}
//BaseServer struct
type BaseServer struct {
id int
bridge NetBridge
task *file.Tunnel
errorContent []byte
sync.Mutex
}
func NewBaseServer(bridge *bridge.Bridge, task *file.Tunnel) *BaseServer {
return &BaseServer{
bridge: bridge,
task: task,
errorContent: nil,
Mutex: sync.Mutex{},
}
}
//add the flow
func (s *BaseServer) FlowAdd(in, out int64) {
s.Lock()
defer s.Unlock()
s.task.Flow.ExportFlow += out
s.task.Flow.InletFlow += in
}
//change the flow
func (s *BaseServer) FlowAddHost(host *file.Host, in, out int64) {
s.Lock()
defer s.Unlock()
host.Flow.ExportFlow += out
host.Flow.InletFlow += in
}
//write fail bytes to the connection
func (s *BaseServer) writeConnFail(c net.Conn) {
c.Write([]byte(common.ConnectionFailBytes))
c.Write(s.errorContent)
}
//auth check
func (s *BaseServer) auth(r *http.Request, c *conn.Conn, u, p string) error {
if u != "" && p != "" && !common.CheckAuth(r, u, p) {
c.Write([]byte(common.UnauthorizedBytes))
c.Close()
return errors.New("401 Unauthorized")
}
return nil
}
//check flow limit of the client ,and decrease the allow num of client
func (s *BaseServer) CheckFlowAndConnNum(client *file.Client) error {
if client.Flow.FlowLimit > 0 && (client.Flow.FlowLimit<<20) < (client.Flow.ExportFlow+client.Flow.InletFlow) {
return errors.New("Traffic exceeded")
}
if !client.GetConn() {
return errors.New("Connections exceed the current client limit")
}
return nil
}
//create a new connection and start bytes copying
func (s *BaseServer) DealClient(c *conn.Conn, client *file.Client, addr string, rb []byte, tp string, f func(), flow *file.Flow, localProxy bool) error {
link := conn.NewLink(tp, addr, client.Cnf.Crypt, client.Cnf.Compress, c.Conn.RemoteAddr().String(), localProxy)
if target, err := s.bridge.SendLinkInfo(client.Id, link, s.task); err != nil {
logs.Warn("get connection from client id %d error %s", client.Id, err.Error())
c.Close()
return err
} else {
if f != nil {
f()
}
conn.CopyWaitGroup(target, c.Conn, link.Crypt, link.Compress, client.Rate, flow, true, rb)
}
return nil
}