/
listener.go
290 lines (241 loc) · 6.59 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
package manager
import (
"bufio"
"context"
"errors"
"html"
"io"
"net/http"
"strconv"
"strings"
"time"
"unicode/utf8"
radio "github.com/R-a-dio/valkyrie"
"github.com/R-a-dio/valkyrie/config"
"github.com/rs/zerolog"
)
const maxMetadataLength = 255 * 16
// Listener listens to an icecast mp3 stream with interleaved song metadata
type Listener struct {
config.Config
// done is closed when run exits, and indicates this listener instance stopped running
done chan struct{}
// cancel is called when Shutdown is called and cancels all operations started by run
cancel context.CancelFunc
// manager is an RPC client to the status manager
manager radio.ManagerService
// prevSong is the last song we got from the stream
prevSong string
}
// NewListener creates a listener and starts running in the background immediately
func NewListener(ctx context.Context, cfg config.Config, m radio.ManagerService) *Listener {
ln := Listener{
Config: cfg,
manager: m,
done: make(chan struct{}),
}
ctx, ln.cancel = context.WithCancel(ctx)
go func() {
defer ln.cancel()
defer close(ln.done)
ln.run(ctx)
}()
return &ln
}
// Shutdown signals the listener to stop running, and waits for it to exit
func (ln *Listener) Shutdown() error {
ln.cancel()
<-ln.done
return nil
}
func (ln *Listener) run(ctx context.Context) {
logger := zerolog.Ctx(ctx)
for {
select {
case <-ctx.Done():
return
default:
}
conn, metasize, err := ln.newConn(ctx)
if err != nil {
logger.Error().Err(err).Msg("connecting")
// wait a bit before retrying the connection
select {
case <-ctx.Done():
case <-time.After(time.Second * 2):
}
continue
}
err = ln.parseResponse(ctx, metasize, conn)
if err != nil {
// log the error, and try reconnecting
logger.Error().Err(err).Msg("connection")
}
}
}
func (ln *Listener) newConn(ctx context.Context) (io.ReadCloser, int, error) {
uri := ln.Conf().Manager.StreamURL
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
return nil, 0, err
}
// we don't want to re-use connections for the audio stream
req.Close = true
// we want interleaved metadata so we have to ask for it
req.Header.Add("Icy-MetaData", "1")
req.Header.Set("User-Agent", ln.Conf().UserAgent)
req = req.WithContext(ctx)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, 0, err
}
// special case for when a fallback isn't setup in icecast; this gives us a 404 status
// code on the mountpoint configured so try and see if we can wake up the streamer by
// sending a fake fallback to the manager
if resp.StatusCode == http.StatusNotFound {
fallbacks := ln.Conf().Manager.FallbackNames
var fallback = "fallback"
if len(fallbacks) > 0 {
fallback = fallbacks[0]
}
ln.manager.UpdateSong(ctx, &radio.SongUpdate{
Song: radio.Song{Metadata: fallback},
Info: radio.SongInfo{
Start: time.Now(),
IsFallback: true,
},
})
}
if resp.StatusCode != 200 {
resp.Body.Close()
return nil, 0, errors.New("listener: request error: " + resp.Status)
}
metasize, err := strconv.Atoi(resp.Header.Get("icy-metaint"))
if err != nil {
resp.Body.Close()
return nil, 0, err
}
return resp.Body, metasize, nil
}
func (ln *Listener) parseResponse(ctx context.Context, metasize int, src io.Reader) error {
r := bufio.NewReader(src)
logger := zerolog.Ctx(ctx)
var meta map[string]string
var buf = make([]byte, metasize)
if metasize <= maxMetadataLength {
// we allocate one extra byte to support semicolon insertion in
// metadata parsing
buf = make([]byte, maxMetadataLength+1)
}
for {
// we first get actual mp3 data from icecast
_, err := io.ReadFull(r, buf[:metasize])
if err != nil {
return err
}
// then we get a single byte indicating metadata length
b, err := r.ReadByte()
if err != nil {
return err
}
// if the length is set to 0 we're not expecting any metadata and can
// read data again
if b == 0 {
continue
}
// else metadata length needs to be multiplied by 16 from the wire
length := int(b * 16)
_, err = io.ReadFull(r, buf[:length])
if err != nil {
return err
}
// now parse the metadata
meta = parseMetadata(buf[:length])
if len(meta) == 0 {
// fatal because it most likely means we've lost sync with the data
// stream and can't find our metadata anymore.
return errors.New("listener: empty metadata: " + string(buf[:length]))
}
song := meta["StreamTitle"]
if song == "" {
logger.Info().Msg("empty metadata")
continue
}
if song == ln.prevSong {
logger.Info().Str("metadata", song).Msg("same metadata")
continue
}
s := radio.Song{
Metadata: strings.TrimSpace(song),
}
info := radio.SongInfo{
Start: time.Now(),
IsFallback: ln.isFallback(song),
}
// set the previous song metadata only if we're not on a fallback to avoid
// stream -> drop (onto fallback) -> stream patterns announcing multiple times
if !info.IsFallback {
ln.prevSong = song
}
go func() {
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
update := &radio.SongUpdate{Song: s, Info: info}
err := ln.manager.UpdateSong(ctx, update)
if err != nil {
logger.Error().Err(err).Msg("updating stream song")
}
}()
}
}
// isFallback checks if the meta passed in matches one of the known fallback
// mountpoint meta as defined with `fallbacknames` in configuration file
func (ln *Listener) isFallback(meta string) bool {
for _, fallback := range ln.Conf().Manager.FallbackNames {
if fallback == meta {
return true
}
}
return false
}
func parseMetadata(b []byte) map[string]string {
var meta = make(map[string]string, 2)
// trim any padding nul bytes and insert a trailing semicolon if one
// doesn't exist yet
for i := len(b) - 1; i > 0; i-- {
if b[i] == '\x00' {
continue
}
if b[i] == ';' {
// already have a trailing semicolon
b = b[:i+1]
break
}
// don't have one, so add one
b = append(b[:i+1], ';')
break
}
for {
var key, value string
b, key = findSequence(b, '=', '\'')
b, value = findSequence(b, '\'', ';')
if key == "" {
break
}
// try and do any html escaping, icecast default configuration will send unicode chars
// as html escaped characters
value = html.UnescapeString(value)
// replace any broken utf8, since other layers expect valid utf8 we do it at the edge
value = strings.ToValidUTF8(value, string(utf8.RuneError))
meta[key] = value
}
return meta
}
func findSequence(seq []byte, a, b byte) ([]byte, string) {
for i := 1; i < len(seq); i++ {
if seq[i-1] == a && seq[i] == b {
return seq[i+1:], string(seq[:i-1])
}
}
return nil, ""
}