/
fsconn.go
393 lines (347 loc) · 11.1 KB
/
fsconn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
/*
fsock.go is released under the MIT License <http://www.opensource.org/licenses/mit-license.php
Copyright (C) ITsysCOM. All Rights Reserved.
Provides FreeSWITCH socket communication.
*/
package fsock
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
"strconv"
"strings"
"sync"
"time"
)
// NewFSConn constructs and connects a FSConn
func NewFSConn(addr, passwd string,
connIdx int,
replyTimeout time.Duration,
connErr chan error,
lgr logger,
evFilters map[string][]string,
eventHandlers map[string][]func(string, int),
bgapi bool,
) (*FSConn, error) {
fsConn := &FSConn{
connIdx: connIdx,
replyTimeout: replyTimeout,
lgr: lgr,
err: connErr,
replies: make(chan string),
eventHandlers: eventHandlers,
bgapiChan: make(map[string]chan string),
bgapiMux: new(sync.RWMutex),
}
// Build the TCP connection and the buffer reading it
var err error
if fsConn.conn, err = net.Dial("tcp", addr); err != nil {
fsConn.lgr.Err(fmt.Sprintf("<FSock> Attempt to connect to FreeSWITCH, received: %s", err.Error()))
return nil, err
}
fsConn.rdr = bufio.NewReaderSize(fsConn.conn, 8192) // reinit buffer
fsConn.lgr.Info("<FSock> Successfully connected to FreeSWITCH!")
// Connected, auth and subscribe to desired events and filters
var authChlng string
if authChlng, err = fsConn.readHeaders(); err != nil {
return nil, err
}
if !strings.Contains(authChlng, "auth/request") {
fsConn.conn.Close()
return nil, errors.New("no auth challenge received")
}
if err = fsConn.auth(passwd); err != nil { // Auth did not succeed
return nil, err
}
if err = fsConn.filterEvents(evFilters, bgapi); err != nil {
return nil, err
}
if err = fsConn.eventsPlain(getMapKeys(eventHandlers),
bgapi); err != nil {
return nil, err
}
go fsConn.readEvents() // Fork read events in it's own goroutine
return fsConn, nil
}
type FSConn struct {
connIdx int // Identifier for the component using this instance of FSConn, optional
replyTimeout time.Duration // Timeout for awaiting replies
conn net.Conn // TCP connection to FreeSWITCH
rdr *bufio.Reader // Reader for the TCP connection
lgr logger // Logger for logging messages
err chan error // Channel for reporting errors
replies chan string // Channel for receiving replies
eventHandlers map[string][]func(string, int) // eventStr, connId, handles events
bgapiChan map[string]chan string // Channels used by bgapi
bgapiMux *sync.RWMutex // Protects the bgapiChan map
}
// readHeaders reads and parses the headers from a FreeSWITCH response.
func (fsConn *FSConn) readHeaders() (header string, err error) {
bytesRead := make([]byte, 0) // buffer to accumulate header bytes
var readLine []byte // temporary slice to hold each line read
for {
if readLine, err = fsConn.rdr.ReadBytes('\n'); err != nil {
fsConn.lgr.Err(fmt.Sprintf(
"<FSock> Error reading headers: <%v>", err))
fsConn.conn.Close() // close the connection regardless
// Distinguish between errors to handle reconnect logic. If it's not
// a network operation error (net.OpError), return io.EOF to signal a
// reconnect. Otherwise, return the actual error encountered.
var opErr *net.OpError
if !errors.As(err, &opErr) {
return "", io.EOF
}
return "", err
}
// Check if the line is empty.
if len(bytes.TrimSpace(readLine)) == 0 {
// Empty line indicates the end of the headers, exit loop.
break
}
bytesRead = append(bytesRead, readLine...)
}
return string(bytesRead), nil
}
// auth authenticates the connection with FreeSWITCH using the provided password.
func (fsConn *FSConn) auth(passwd string) (err error) {
if err = fsConn.send("auth " + passwd + "\n\n"); err != nil {
fsConn.conn.Close()
return
}
var rply string
if rply, err = fsConn.readHeaders(); err != nil {
return
}
if !strings.Contains(rply, "Reply-Text: +OK accepted") {
fsConn.conn.Close()
return fmt.Errorf("unexpected auth reply received: <%s>", rply)
}
return
}
// filterEvents will filter the Events coming from FreeSWITCH.
func (fsConn *FSConn) filterEvents(filters map[string][]string, bgapi bool) (err error) {
if len(filters) == 0 {
return nil
}
if bgapi {
filters["Event-Name"] = append(filters["Event-Name"], "BACKGROUND_JOB") // for bgapi
}
for hdr, vals := range filters {
for _, val := range vals {
if err = fsConn.send("filter " + hdr + " " + val + "\n\n"); err != nil {
fsConn.lgr.Err(fmt.Sprintf("<FSock> Error filtering events: <%s>", err.Error()))
fsConn.conn.Close()
return
}
var rply string
if rply, err = fsConn.readHeaders(); err != nil {
return
}
if !strings.Contains(rply, "Reply-Text: +OK") {
fsConn.conn.Close()
return fmt.Errorf(`unexpected filter-events reply received: <%s>`, rply)
}
}
}
return nil
}
// send will send the content over the connection.
func (fsConn *FSConn) send(sendContent string) (err error) {
if _, err = fsConn.conn.Write([]byte(sendContent)); err != nil {
fsConn.lgr.Err(fmt.Sprintf("<FSock> Cannot write command to socket <%s>", err.Error()))
}
return
}
// eventsPlain will subscribe for events in plain mode.
func (fsConn *FSConn) eventsPlain(events []string, bgapi bool) (err error) {
eventsCmd := "event plain"
customEvents := ""
for _, ev := range events {
if ev == "ALL" {
eventsCmd = "event plain all"
break
}
if strings.HasPrefix(ev, "CUSTOM") {
customEvents += ev[6:] // will capture here also space between CUSTOM and event
continue
}
eventsCmd += " " + ev
}
if eventsCmd != "event plain all" {
if bgapi {
eventsCmd += " BACKGROUND_JOB" // For bgapi
}
if len(customEvents) != 0 { // Add CUSTOM events subscribing in the end otherwise unexpected events are received
eventsCmd += " " + "CUSTOM" + customEvents
}
}
if err = fsConn.send(eventsCmd + "\n\n"); err != nil {
fsConn.conn.Close()
return
}
var rply string
if rply, err = fsConn.readHeaders(); err != nil {
return
}
if !strings.Contains(rply, "Reply-Text: +OK") {
fsConn.conn.Close()
return fmt.Errorf("unexpected events-subscribe reply received: <%s>", rply)
}
return
}
// readEvent will read one Event from FreeSWITCH, made out of headers and body (if present).
func (fsConn *FSConn) readEvent() (header string, body string, err error) {
if header, err = fsConn.readHeaders(); err != nil {
return
}
if !strings.Contains(header, "Content-Length") { //No body
return
}
var cl int
if cl, err = strconv.Atoi(headerVal(header, "Content-Length")); err != nil {
err = fmt.Errorf("cannot extract content length, err: <%s>", err)
return
}
body, err = fsConn.readBody(cl)
return
}
// readBody reads the specified number of bytes from the buffer.
// The number of bytes to read is given by 'noBytes', which is determined from the content-length header.
func (fsConn *FSConn) readBody(noBytes int) (string, error) {
bytesRead := make([]byte, noBytes)
_, err := io.ReadFull(fsConn.rdr, bytesRead)
if err != nil {
fsConn.lgr.Err(fmt.Sprintf("<FSock> Error reading message body: <%v>", err))
fsConn.conn.Close()
return "", io.EOF // Return io.EOF to trigger ReconnectIfNeeded.
}
return string(bytesRead), nil
}
// readEvents continuously reads and processes events from the network buffer. It stops
// and exits the loop if an error is encountered, after sending it to fsConn.err.
func (fsConn *FSConn) readEvents() {
for {
hdr, body, err := fsConn.readEvent()
// If an error occurs during the read operation, report
// it on the error channel and exit the loop.
if err != nil {
fsConn.err <- err
return
}
switch {
case strings.Contains(hdr, "api/response"):
// For API responses, send the body
// directly to the replies channel.
fsConn.replies <- body
case strings.Contains(hdr, "command/reply"):
// For command replies, extract the "Reply-Text" from
// the header and send it to the replies channel.
fsConn.replies <- headerVal(hdr, "Reply-Text")
case body != "":
// Could be an event, try dispatching it.
fsConn.dispatchEvent(body)
}
}
}
// Dispatch events to handlers in async mode
func (fsConn *FSConn) dispatchEvent(event string) {
eventName := headerVal(event, "Event-Name")
if eventName == "BACKGROUND_JOB" { // for bgapi BACKGROUND_JOB
go fsConn.doBackgroundJob(event)
return
}
if eventName == "CUSTOM" {
eventSubclass := headerVal(event, "Event-Subclass")
if len(eventSubclass) != 0 {
eventName += " " + urlDecode(eventSubclass)
}
}
for _, handleName := range []string{eventName, "ALL"} {
if _, hasHandlers := fsConn.eventHandlers[handleName]; hasHandlers {
// We have handlers, dispatch to all of them
for _, handlerFunc := range fsConn.eventHandlers[handleName] {
go handlerFunc(event, fsConn.connIdx)
}
return
}
}
fsConn.lgr.Warning(fmt.Sprintf("<FSock> No dispatcher for event: <%+v> with event name: %s", event, eventName))
}
// bgapi event lisen fuction
func (fsConn *FSConn) doBackgroundJob(event string) { // add mutex protection
evMap := EventToMap(event)
jobUUID, has := evMap["Job-UUID"]
if !has {
fsConn.lgr.Err("<FSock> BACKGROUND_JOB with no Job-UUID")
return
}
fsConn.bgapiMux.Lock()
defer fsConn.bgapiMux.Unlock()
var out chan string
out, has = fsConn.bgapiChan[jobUUID]
if !has {
fsConn.lgr.Err(fmt.Sprintf("<FSock> BACKGROUND_JOB with UUID %s lost!", jobUUID))
return // not a requested bgapi
}
delete(fsConn.bgapiChan, jobUUID)
out <- evMap[EventBodyTag]
}
// Send will send the content over the connection, exposing synchronous interface outside
func (fsConn *FSConn) Send(payload string) (string, error) {
if err := fsConn.send(payload); err != nil {
return "", err
}
// Prepare a context based on fsConn.replyTimeout
var ctx context.Context
var cancel context.CancelFunc
if fsConn.replyTimeout > 0 {
ctx, cancel = context.WithTimeout(context.Background(), fsConn.replyTimeout)
} else {
ctx, cancel = context.WithCancel(context.Background())
}
defer cancel()
replies := make(chan string)
replyErrors := make(chan error)
go func() {
select {
case reply := <-fsConn.replies:
if strings.Contains(reply, "-ERR") {
replyErrors <- errors.New(strings.TrimSpace(reply))
return
}
replies <- reply
case <-ctx.Done():
replyErrors <- ctx.Err()
}
}()
select {
case reply := <-replies:
return reply, nil
case err := <-replyErrors:
return "", err
}
}
// Send BGAPI command
func (fsConn *FSConn) SendBgapiCmd(cmdStr string) (out chan string, err error) {
jobUUID := genUUID()
out = make(chan string)
fsConn.bgapiMux.Lock()
fsConn.bgapiChan[jobUUID] = out
fsConn.bgapiMux.Unlock()
if _, err = fsConn.Send("bgapi " + cmdStr + "\nJob-UUID:" + jobUUID + "\n\n"); err != nil {
return nil, err
}
return
}
// Disconnect will disconnect the fsConn from FreeSWITCH
func (fsConn *FSConn) Disconnect() error {
return fsConn.conn.Close()
}
// LocalAddr returns the local address of the connection
func (fsConn *FSConn) LocalAddr() net.Addr {
return fsConn.conn.LocalAddr()
}