forked from stampzilla/gocast
/
packetstream.go
86 lines (67 loc) · 1.56 KB
/
packetstream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package gocast
import (
"encoding/binary"
log "github.com/sirupsen/logrus"
"io"
)
type packetStream struct {
stream io.ReadWriteCloser
packets chan packetContainer
}
type packetContainer struct {
payload *[]byte
err error
}
func NewPacketStream(stream io.ReadWriteCloser) *packetStream {
wrapper := packetStream{
stream: stream,
packets: make(chan packetContainer),
}
wrapper.readPackets()
return &wrapper
}
func (w *packetStream) readPackets() {
var length uint32
go func() {
for {
err := binary.Read(w.stream, binary.BigEndian, &length)
if err != nil {
log.Warnf("Failed binary.Read packet: %s", err)
w.packets <- packetContainer{err: err, payload: nil}
return
}
//TODO make sure this goroutine is killed on disconnect
if length > 0 {
packet := make([]byte, length)
i, err := w.stream.Read(packet)
if err != nil {
log.Warnf("Failed to read packet: %s", err)
continue
}
if i != int(length) {
log.Warnf("Invalid packet size. Wanted: %d Read: %d", length, i)
continue
}
w.packets <- packetContainer{
payload: &packet,
err: nil,
}
}
}
}()
}
func (w *packetStream) Read() (*[]byte, error) {
pkt := <-w.packets
if pkt.err != nil {
close(w.packets)
}
return pkt.payload, pkt.err
}
func (w *packetStream) Write(data []byte) (int, error) {
err := binary.Write(w.stream, binary.BigEndian, uint32(len(data)))
if err != nil {
err = log.Errorf("Failed to write packet length %d. error:%s\n", len(data), err)
return 0, err
}
return w.stream.Write(data)
}