-
Notifications
You must be signed in to change notification settings - Fork 2
/
osipsdagram.go
284 lines (259 loc) · 7.9 KB
/
osipsdagram.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
/*
osipsdagram is released under the MIT License <http://www.opensource.org/licenses/mit-license.php
Copyright (C) ITsysCOM GmbH. All Rights Reserved.
Provides OpenSIPS mi_datagram communication and event server.
*/
package osipsdagram
import (
"bytes"
"errors"
"io"
"net"
"sync"
"time"
)
func fib() func() int {
a, b := 0, 1
return func() int {
a, b = b, a+b
return a
}
}
type OsipsEvent struct {
Name string // Event name
AttrValues map[string]string // Populate AttributeValue pairs here
Values []string // Populate single values here
OriginatorAddress *net.UDPAddr // Address of the entity originating the package
}
func NewEventServer(addrStr string, eventHandlers map[string][]func(*OsipsEvent)) (*OsipsEventServer, error) {
var evSrv *OsipsEventServer
if addr, err := net.ResolveUDPAddr("udp", addrStr); err != nil {
return nil, err
} else if sock, err := net.ListenUDP("udp", addr); err != nil {
return nil, err
} else {
evSrv = &OsipsEventServer{conn: sock, eventsBuffer: bytes.NewBuffer(nil), eventHandlers: eventHandlers}
}
return evSrv, nil
}
// Receives events from OpenSIPS server
type OsipsEventServer struct {
conn *net.UDPConn
eventsBuffer *bytes.Buffer
eventHandlers map[string][]func(*OsipsEvent)
}
func (evSrv *OsipsEventServer) ServeEvents(stopServing chan struct{}) error {
var buf [65457]byte
for {
select {
case <-stopServing: // Break this loop from outside
return nil
default:
evSrv.conn.SetReadDeadline(time.Now().Add(time.Duration(1) * time.Second))
if readBytes, origAddr, err := evSrv.conn.ReadFromUDP(buf[0:]); err != nil {
if e, ok := err.(net.Error); ok && e.Timeout() && readBytes == 0 { // Not real error but our enforcement, continue reading events
continue
}
return err
} else if err := evSrv.processReceivedData(buf[:readBytes], origAddr); err != nil {
return err
}
}
}
}
// Build event type out of received data
func (evSrv *OsipsEventServer) processReceivedData(rcvData []byte, origAddr *net.UDPAddr) error {
if idxEndEvent := bytes.Index(rcvData, []byte("\n\n")); idxEndEvent == -1 { // Could not find event delimiter, something went wrong here
return errors.New("PARSE_ERROR")
} else { // Try generating event out event data, and start fresh a new one after resetting the buffer
endEvent, startNewEvent := rcvData[:idxEndEvent+2], rcvData[idxEndEvent+2:]
if _, err := evSrv.eventsBuffer.Write(endEvent); err != nil { // Possible error here is buffer full
return err
}
if newEvent, err := evSrv.generateEvent(origAddr); err != nil {
return err
} else if err := evSrv.dispatchEvent(newEvent); err != nil {
return err
}
evSrv.eventsBuffer.Reset() // Have finished consuming the previous event data, empty write buffer
if _, err := evSrv.eventsBuffer.Write(startNewEvent); err != nil {
return err
}
}
return nil
}
// Instantiate event
func (evSrv *OsipsEventServer) generateEvent(origAddr *net.UDPAddr) (*OsipsEvent, error) {
ev := &OsipsEvent{AttrValues: make(map[string]string), OriginatorAddress: origAddr}
if eventName, err := evSrv.eventsBuffer.ReadBytes('\n'); err != nil {
return nil, err
} else {
ev.Name = string(eventName[:len(eventName)-1])
}
for {
valByte, err := evSrv.eventsBuffer.ReadBytes('\n')
if err != nil && err == io.EOF {
break
} else if err != nil {
return nil, err
}
valByte = valByte[:len(valByte)-1] // Remove \n in the end
if len(valByte) == 0 { // Have reached second \n, end of event processing
break
}
if idxSep := bytes.Index(valByte, []byte("::")); idxSep == -1 {
ev.Values = append(ev.Values, string(valByte))
} else {
ev.AttrValues[string(valByte[:idxSep])] = string(valByte[idxSep+2:])
}
}
return ev, nil
}
func (evSrv *OsipsEventServer) dispatchEvent(ev *OsipsEvent) error {
if handlers, hasHandler := evSrv.eventHandlers[ev.Name]; hasHandler {
for _, handlerFunc := range handlers {
go handlerFunc(ev)
}
}
return nil
}
func NewOsipsMiDatagramConnector(addrStr string, reconnects int) (*OsipsMiDatagramConnector, error) {
mi := &OsipsMiDatagramConnector{osipsAddr: addrStr, reconnects: reconnects, delayFunc: fib(), connMutex: new(sync.RWMutex)}
if err := mi.connect(); err != nil {
return nil, err
}
return mi, nil
}
// Represents connection to OpenSIPS mi_datagram
type OsipsMiDatagramConnector struct {
osipsAddr string
reconnects int
delayFunc func() int
conn *net.UDPConn
connMutex *sync.RWMutex
}
// Read from network buffer
func (mi *OsipsMiDatagramConnector) readDatagram() ([]byte, error) {
var buf [65457]byte
readBytes, _, err := mi.conn.ReadFromUDP(buf[0:])
if err != nil {
mi.disconnect()
return nil, err
}
return buf[:readBytes], nil
}
func (mi *OsipsMiDatagramConnector) disconnect() {
mi.connMutex.Lock()
defer mi.connMutex.Unlock()
mi.conn.Close()
mi.conn = nil
}
func (mi *OsipsMiDatagramConnector) connected() bool {
mi.connMutex.RLock()
defer mi.connMutex.RUnlock()
return mi.conn != nil
}
// Connect with re-connect and start also listener for inbound replies
func (mi *OsipsMiDatagramConnector) connect() error {
var err error
if mi.connected() {
mi.disconnect()
}
udpAddr, err := net.ResolveUDPAddr("udp4", mi.osipsAddr)
if err != nil {
return err
}
mi.connMutex.Lock()
defer mi.connMutex.Unlock()
mi.conn, err = net.DialUDP("udp", nil, udpAddr)
if err != nil {
return err
}
return nil
}
func (mi *OsipsMiDatagramConnector) reconnectIfNeeded() error {
if mi.connected() { // No need to reconnect
return nil
}
var err error
i := 0
for {
if mi.reconnects != -1 && i >= mi.reconnects { // Maximum reconnects reached, -1 for infinite reconnects
break
}
if err = mi.connect(); err == nil || mi.connected() {
mi.delayFunc = fib() // Reset the reconnect delay
break // No error or unrelated to connection
}
time.Sleep(time.Duration(mi.delayFunc()) * time.Second)
i++
}
if err == nil && !mi.connected() {
return errors.New("NOT_CONNECTED")
}
return err // nil or last error in the loop
}
// Send a command, re-connect in background if needed
func (mi *OsipsMiDatagramConnector) SendCommand(cmd []byte) ([]byte, error) {
if err := mi.reconnectIfNeeded(); err != nil {
return nil, err
}
mi.connMutex.RLock()
if _, err := mi.conn.Write(cmd); err != nil {
mi.connMutex.RUnlock()
return nil, err
}
mi.connMutex.RUnlock()
return mi.readDatagram()
}
// Useful to find out from outside the local IP/Port connected
func (mi *OsipsMiDatagramConnector) LocallAddr() net.Addr {
if !mi.connected() {
return nil
}
mi.connMutex.RLock()
defer mi.connMutex.RUnlock()
return mi.conn.LocalAddr()
}
func NewOsipsMiConPool(address string, reconnects int, maxConnections int) (*OsipsMiConPool, error) {
miPool := &OsipsMiConPool{osipsAddr: address, reconnects: reconnects, mis: make(chan *OsipsMiDatagramConnector, maxConnections)}
for i := 0; i < maxConnections; i++ {
miPool.mis <- nil // Empty instantiate so we do not need to wait later when we pop
}
return miPool, nil
}
type OsipsMiConPool struct {
osipsAddr string
reconnects int
mis chan *OsipsMiDatagramConnector // Here will be a reference towards the available connectors
}
func (mipool *OsipsMiConPool) PopMiConn() (*OsipsMiDatagramConnector, error) {
if mipool == nil {
return nil, errors.New("UNCONFIGURED_OPENSIPS_POOL")
}
var err error
mi := <-mipool.mis
if mi == nil {
mi, err = NewOsipsMiDatagramConnector(mipool.osipsAddr, mipool.reconnects)
if err != nil {
return nil, err
}
return mi, nil
} else {
return mi, nil
}
}
func (mipool *OsipsMiConPool) PushMiConn(mi *OsipsMiDatagramConnector) {
if mi.connected() { // We only add it back if the socket is still connected
mipool.mis <- mi
}
}
func (mipool *OsipsMiConPool) SendCommand(cmd []byte) ([]byte, error) {
miConn, err := mipool.PopMiConn()
if err != nil {
return nil, err
} else {
defer mipool.PushMiConn(miConn)
}
return miConn.SendCommand(cmd)
}