-
Notifications
You must be signed in to change notification settings - Fork 1
/
stream_reader.go
83 lines (66 loc) · 2.5 KB
/
stream_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package smpp
import (
"encoding/binary"
"io"
"net"
)
// NetworkStreamReader provides a mechanism for reading PDUs from an incoming TCP stream connection, breaking
// the stream into PDUs
type NetworkStreamReader struct {
connectionFromWhichToRead net.Conn
readBuffer []byte
pduBuffer []byte
attachedConnectionIsClosed bool
}
// NewNetworkStreamReader creates a NetworkStreamReader that operates on the identified connection
func NewNetworkStreamReader(fromConnection net.Conn) *NetworkStreamReader {
return &NetworkStreamReader{connectionFromWhichToRead: fromConnection, readBuffer: make([]byte, 65536), pduBuffer: make([]byte, 0, 65536), attachedConnectionIsClosed: false}
}
// Read performs a read of the associated TCP stream and attempts to extract one or more PDUs from the
// stream. If there are data left over after extracting zero or more PDUs, those data are saved, and
// subsequent Read values are appended to those data
func (reader *NetworkStreamReader) Read() ([]*PDU, error) {
bytesRead, err := reader.connectionFromWhichToRead.Read(reader.readBuffer)
if err != nil {
if err == io.EOF {
reader.attachedConnectionIsClosed = true
}
return nil, err
}
reader.pduBuffer = append(reader.pduBuffer, reader.readBuffer[:bytesRead]...)
extractedPDUs := make([]*PDU, 0, 3)
for len(reader.pduBuffer) >= 16 {
pduLength := uint32(binary.BigEndian.Uint32(reader.pduBuffer[0:4]))
if len(reader.pduBuffer) >= int(pduLength) {
pdu, err := DecodePDU(reader.pduBuffer[:pduLength])
copy(reader.pduBuffer[0:len(reader.pduBuffer)-int(pduLength)], reader.pduBuffer[pduLength:])
reader.pduBuffer = reader.pduBuffer[:len(reader.pduBuffer)-int(pduLength)]
if err != nil {
return extractedPDUs, err
}
extractedPDUs = append(extractedPDUs, pdu)
} else {
return extractedPDUs, nil
}
}
return extractedPDUs, nil
}
// ExtractNextPDUs repeatedly reads from the TCP stream until there is at least one PDU.
// It returns the set of extracted PDUs, and like Read(), stores any remaining data for
// subsequent calls
func (reader *NetworkStreamReader) ExtractNextPDUs() ([]*PDU, error) {
for {
pdus, err := reader.Read()
if err != nil {
return nil, err
}
if len(pdus) > 0 {
return pdus, nil
}
}
}
// AttachedConnectionIsClosed returns true if the underlying TCP connection has
// closed, which is determined during a Read() read operation
func (reader *NetworkStreamReader) AttachedConnectionIsClosed() bool {
return reader.attachedConnectionIsClosed
}