/
tcp_reader.go
144 lines (131 loc) · 4.09 KB
/
tcp_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package tunnel
import (
"bytes"
"encoding/binary"
"io"
"net"
"github.com/gugemichael/nimo4go"
LOG "github.com/vinllen/log4go"
)
type TCPReader struct {
// listen
listenAddress string
// for golang tcp socket
channel [2]*ListenSocket
replayer []Replayer
ack int64
}
type ListenSocket struct {
addr *net.TCPAddr
listener *net.TCPListener
}
func (reader *TCPReader) Link(replayer []Replayer) (err error) {
reader.replayer = replayer
for i := 0; i != TotalQueueNum; i++ {
reader.channel[i] = new(ListenSocket)
reader.channel[i].addr, err = net.ResolveTCPAddr("tcp4", reader.listenAddress)
if err != nil {
LOG.Critical("Resolve channel listenAddress error: %s", err.Error())
return err
}
}
reader.channel[RecvAckChannel].addr.Port = reader.channel[RecvAckChannel].addr.Port + 1
for i := 0; i != TotalQueueNum; i++ {
reader.channel[i].listener, err = net.ListenTCP("tcp", reader.channel[i].addr)
if err != nil {
LOG.Critical("Tcp reader server listen %v error: %s", reader.channel[i].addr, err.Error())
return err
}
}
// fork listen acceptor for oplog transfer tunnel
nimo.GoRoutineInLoop(func() {
socket, err := reader.channel[TransferChannel].listener.AcceptTCP()
if err != nil {
LOG.Warn("Server accept channel error : %s", err.Error())
return
}
socket.SetNoDelay(false)
socket.SetLinger(0)
socket.SetReadBuffer(1024 * 1024 * 16)
nimo.GoRoutine(func() {
reader.recvTransfer(socket)
})
})
// fork listen acceptor for ack value query tunnel
nimo.GoRoutineInLoop(func() {
socket, err := reader.channel[RecvAckChannel].listener.AcceptTCP()
if err != nil {
LOG.Warn("Server ACK accept ch error : %s", err.Error())
return
}
socket.SetNoDelay(true)
socket.SetLinger(0)
nimo.GoRoutine(func() {
reader.recvGetAck(socket)
})
})
return nil
}
func (reader *TCPReader) recvTransfer(socket *net.TCPConn) {
defer socket.Close()
// every entire packet just for one loop time
header := [HeaderLen]byte{}
for {
socketTimeout(socket, NetworkDefaultTimeout*10)
// read util entire header
if _, err := io.ReadAtLeast(socket, header[:], HeaderLen); err != nil {
LOG.Warn("Server transfer read header at least failed readAtLeast %d, %s",
HeaderLen, err.Error())
return
}
packet := NewPacketV1(PacketIncomplete, nil)
if !packet.decodeHeader(header[:]) {
LOG.Warn("Server transfer decode header failed")
return
}
nimo.AssertTrue(packet.typeOf == PacketWrite && packet.length != 0, "transfer receive bad type packet")
payload := make([]byte, packet.length)
if _, err := io.ReadAtLeast(socket, payload, int(packet.length)); err != nil {
LOG.Warn("Server transfer read packet at least failed readAtLeast %d, %s",
packet.length, err.Error())
return
}
message := new(TMessage)
message.FromBytes(payload, binary.BigEndian)
// hash corresponding replayer and re-sharding
if message.Shard >= uint32(len(reader.replayer)) {
message.Shard %= uint32(len(reader.replayer))
}
reader.ack = reader.replayer[message.Shard].Sync(message, nil)
}
}
func (reader *TCPReader) recvGetAck(socket *net.TCPConn) {
defer socket.Close()
// every entire packet just for one loop time
header := [HeaderLen]byte{}
for {
socketTimeout(socket, NetworkDefaultTimeout)
// read util entire header
if _, err := io.ReadAtLeast(socket, header[:], HeaderLen); err != nil {
LOG.Warn("Server ack read header at least failed readAtLeast %d, %s",
HeaderLen, err.Error())
return
}
packet := NewPacketV1(PacketIncomplete, nil)
if !packet.decodeHeader(header[:]) {
LOG.Warn("Server ack decode header failed")
return
}
nimo.AssertTrue(packet.typeOf == PacketGetACK && packet.length == 0, "ack receive bad type packet")
// write back ack
buffer := &bytes.Buffer{}
binary.Write(buffer, binary.BigEndian, reader.ack)
packet = NewPacketV1(PacketReturnACK, buffer.Bytes())
if _, err := socket.Write(packet.encode()); err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
LOG.Warn("Tcp ack send ack back timeout")
}
}
nimo.AssertTrue(packet.length != 0, "ack send bad PacketReturnACK packet len")
}
}