-
Notifications
You must be signed in to change notification settings - Fork 33
/
video_img_handler.go
225 lines (195 loc) · 6.18 KB
/
video_img_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
package main
import (
"fmt"
"strings"
"go.nanomsg.org/mangos/v3"
// register transports
_ "go.nanomsg.org/mangos/v3/transport/all"
uj "github.com/nanoscopic/ujsonin/v2/mod"
log "github.com/sirupsen/logrus"
)
type ImgHandler struct {
inSock mangos.Socket
stopChan chan bool
imgConsumer *ImageConsumer
mainCh chan int
discard bool
imgNum int
sentSize bool
enableStream func()
disableStream func()
udid string
device *Device
isUp bool
}
func NewImgHandler( stopChan chan bool, udid string, device *Device ) ( *ImgHandler ) {
self := ImgHandler {
inSock: nil,
stopChan: stopChan,
mainCh: make( chan int ),
discard: true,
imgNum: 1,
udid: udid,
device: device,
isUp: false,
}
return &self
}
func ( self *ImgHandler ) setImageConsumer( imgConsumer *ImageConsumer ) {
self.imgConsumer = imgConsumer
}
func ( self *ImgHandler ) setEnableStream( enableStream func() ) {
self.enableStream = enableStream
}
func ( self *ImgHandler ) setDisableStream( disableStream func() ) {
self.disableStream = disableStream
}
func ( self *ImgHandler ) setSource( socket mangos.Socket ) {
self.inSock = socket
}
func ( self *ImgHandler ) processImgMsg() (int) {
msg, err := self.inSock.RecvMsg()
if err != nil {
if err == mangos.ErrRecvTimeout {
return 2
} else if err == mangos.ErrClosed {
fmt.Printf("Connection to video closed\n")
return 1
} else {
fmt.Printf( "Other error: %s", err )
return 0
}
}
self.imgNum = self.imgNum + 1
if ( self.imgNum % 30 ) == 0 {
fmt.Printf("Got incoming frame %d\n", self.imgNum)
}
if self.discard && self.sentSize {
msg.Free()
return 0
}
text := ""
data := []byte{}
// image is prepended by some JSON metadata
if len(msg.Body) == 0 {
fmt.Printf("nil message from video app\n")
return 0
}
if msg.Body[0] == '{' {
endi := strings.Index( string(msg.Body), "}" )
root, left := uj.Parse( msg.Body )
lenLeft := len( left )
if ( len(msg.Body ) - lenLeft - 1 ) != endi {
fmt.Printf( "size mistmatched what was parsed: %d != %d\n", endi, len( msg.Body ) - len(left) - 1 )
}
data = left
if lenLeft < 10 {
// it's just a text message
msgNode := root.Get("msg")
if msgNode != nil {
msg := msgNode.String()
if msg == "noframes" {
self.imgConsumer.noframes()
}
}
return 0
}
//ow := root.Get("ow").Int()
//oh := root.Get("oh").Int()
dw := root.Get("dw").Int()
dh := root.Get("dh").Int()
//fmt.Printf("ow=%d, oh=%d, dw=%d, dh=%d\n", ow, oh, dw, dh )
causeNode := root.Get("c")
cause := -1
if causeNode != nil { cause = causeNode.Int() }
crcNode := root.Get("crc")
crc := "n/a"
if crcNode != nil { crc = crcNode.String() }
text = fmt.Sprintf("{\"Width\": %d, \"Height\": %d, \"Size\": %d, \"Cause\": %d, \"Crc\": \"%s\"}",
dw, dh, len( msg.Body ), cause, crc )
if !self.isUp {
self.isUp = true
self.device.EventCh <- DevEvent{ action: DEV_VIDEO_START }
}
if !self.sentSize {
//json := fmt.Sprintf( `{"type":"frame1","width":%d,"height":%d,"uuid":"%s"}`, dw, dh, self.udid )
//fmt.Printf("FIRSTFRAME%s\n",json)
self.sentSize = true
}
} else {
data = msg.Body
}
if !self.discard {
err := self.imgConsumer.consume( text, data )
msg.Free()
if err != nil {
// might as well begin discarding since we can't send
self.discard = true
return 3
}
} else {
msg.Free()
}
return 0
}
func ( self *ImgHandler ) mainLoop( vidStopChan chan bool, controlStopChan chan bool, logStopChan chan bool) (int) {
var res int
reason := "unknown"
log.WithFields( log.Fields{
"type": "video_handler_start",
"udid": censorUuid( self.udid ),
} ).Info("Video handler start")
for {
select {
case <- controlStopChan:
fmt.Printf("Lost connection to control socket\n")
res = 3
goto DONE
case <- logStopChan:
fmt.Printf("Lost connection to log socket\n")
res = 5
goto DONE
case <- vidStopChan:
fmt.Printf("Lost connection to video stream\n")
self.isUp = false
self.device.EventCh <- DevEvent{ action: DEV_VIDEO_STOP }
res = 2
goto DONE
case <- self.stopChan:
//fmt.Printf("Server channel got stop message\n")
res = 1
reason = "Got stop message"
goto DONE
case msg := <- self.mainCh:
if msg == 1 {
self.enableStream()
fmt.Printf("Setting discard to false\n")
self.discard = false
}
if msg == 2 {
fmt.Printf("Setting discard to true\n")
self.discard = true
}
default: // this makes the above read from stopChannel non-blocking
}
pres := self.processImgMsg()
if pres == 1 {
res = 2
goto DONE
}
if pres == 3 { // lost send socket
res = 4
reason = "Lost send socket"
goto DONE
}
self.imgNum++
}
DONE:
log.WithFields( log.Fields{
"type": "video_handler_stop",
"udid": censorUuid( self.udid ),
"reason": reason,
} ).Info("Video handler stop")
self.disableStream()
return res
}