-
Notifications
You must be signed in to change notification settings - Fork 42
/
joint.go
125 lines (113 loc) · 2.43 KB
/
joint.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package core
import (
"net"
"reflect"
"time"
)
type buffer struct {
buf []byte
n int
}
func (b *buffer) String() string {
return string(b.bytes(0))
}
func (b *buffer) bytes(from int) []byte {
return b.buf[from:b.n]
}
func (b *buffer) free() {
if b.buf != nil {
ByteArrPool.Put(b.buf)
b.buf = nil
}
b.n = 0
}
func readDataFromConn(connId uint64, conn net.Conn) chan *buffer {
channel := make(chan *buffer)
go func() {
defer func() {
unknownError := recover()
if unknownError != nil {
if err, ok := unknownError.(error); ok {
errMsg := err.Error()
switch errMsg {
case IgnoreErrorSendOnClosedChannel, IgnoreErrorInvalidMemoryAddress:
return
}
Log.Errorf("Unknown Error: %s", errMsg)
} else {
Log.Errorf("Unknown Error(%v): %v", reflect.TypeOf(unknownError), unknownError)
}
}
}()
var e error
var b *buffer
for {
b = &buffer{buf: ByteArrPool.Get(), n: 0}
b.n, e = conn.Read(b.buf)
if e != nil {
Log.Debugf("Connect(%d) data read-err: %v", connId, e)
b.free()
return
}
if b.n == 0 {
b.free()
return
}
Log.Debugf("Connect(%d) data: %v", connId, b)
channel <- b
}
}()
return channel
}
func writeDataToConn(conn net.Conn, b *buffer) error {
defer b.free()
for i := 0; i < b.n; {
n, e := conn.Write(b.bytes(i))
if e != nil {
return e
}
i += n
}
return nil
}
func connJoint(connId uint64, tcpConn *net.TCPConn, ahriConn *AhriConn) {
defer func() {
unknownError := recover()
if unknownError != nil {
if err, ok := unknownError.(error); ok {
errMsg := err.Error()
switch errMsg {
case IgnoreErrorSendOnClosedChannel, IgnoreErrorInvalidMemoryAddress:
return
}
Log.Errorf("Unknown Error: %s", errMsg)
} else {
Log.Errorf("Unknown Error(%v): %v", reflect.TypeOf(unknownError), unknownError)
}
}
}()
tcpConn.SetNoDelay(true)
dataChan0 := readDataFromConn(connId, tcpConn)
dataChan1 := readDataFromConn(connId, ahriConn)
for {
var e error
select {
case <-time.After(time.Duration(AhriTimeoutSec) * time.Second):
goto loopEnd
case data0 := <-dataChan0:
e = writeDataToConn(ahriConn, data0)
case data1 := <-dataChan1:
e = writeDataToConn(tcpConn, data1)
}
if e != nil {
Log.Warnf("Connect(%d) data write-err: %v", connId, e)
goto loopEnd
}
}
loopEnd:
tcpConn.Close()
ahriConn.Close()
close(dataChan0)
close(dataChan1)
Log.Debugf("Connect(%d) Closed.", connId)
}