-
Notifications
You must be signed in to change notification settings - Fork 73
/
session.go
315 lines (254 loc) · 6.45 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
package play
import (
"context"
"errors"
"sync"
"time"
"github.com/CyCoreSystems/ari"
)
// Session describes a structured Play session.
type Session interface {
// Add appends a set of AudioURIs to a Play session. Note that if the Play session
// has already been completed, this will NOT make it start again.
Add(list ...string)
// Done returns a channel which is closed when the Play session completes
Done() <-chan struct{}
// Err waits for a session to end and returns its error
Err() error
// StopAudio stops the playback of the audio sequence (if there is one), but
// unlike `Stop()`, this does _not_ necessarily terminate the session. If
// the Play session is configured to wait for DTMF following the playback,
// it will still wait after StopAudio() is called.
StopAudio()
// Result waits for a session to end and returns its result
Result() (*Result, error)
// Stop stops a Play session immediately
Stop()
}
type playSession struct {
o *Options
// cancel is the playback context's cancel function
cancel context.CancelFunc
// currentSequence is a pointer to the currently-playing sequence, if there is one
currentSequence *sequence
// digitChan is the channel on which any received DTMF digits will be sent. The received DTMF will also be stored separately, so this channel is primarily for signaling purposes.
digitChan chan string
// closed is a wrapper for done which indicates that done has been closed
closed bool
// done is a channel which is closed when the playback completes execution
done chan struct{}
// mu provides locking for concurrency-related datastructures within the options
mu sync.Mutex
// result is the final result of the playback
result *Result
}
type nilSession struct {
res *Result
}
func (n *nilSession) Add(list ...string) {
}
func (n *nilSession) Done() <-chan struct{} {
ch := make(chan struct{})
close(ch)
return ch
}
func (n *nilSession) Err() error {
return n.res.Error
}
func (n *nilSession) StopAudio() {
}
func (n *nilSession) Result() (*Result, error) {
return n.res, n.res.Error
}
func (n *nilSession) Stop() {
}
func errorSession(err error) *nilSession {
s := &nilSession{
res: new(Result),
}
s.res.Error = err
s.res.Status = Failed
return s
}
func newPlaySession(o *Options) *playSession {
return &playSession{
o: o,
result: new(Result),
digitChan: make(chan string, DigitBufferSize),
done: make(chan struct{}),
}
}
func (s *playSession) play(ctx context.Context, p ari.Player) {
ctx, cancel := context.WithCancel(ctx)
s.cancel = cancel
defer s.Stop()
if s.result == nil {
s.result = new(Result)
}
if s.o.uriList == nil || s.o.uriList.Empty() {
s.result.Error = errors.New("empty playback URI list")
return
}
// cancel if we go over the maximum time
go s.watchMaxTime(ctx)
// Listen for DTMF
go s.listenDTMF(ctx, p)
for i := 0; i < s.o.maxReplays+1; i++ {
if ctx.Err() != nil {
break
}
// Reset the digit cache
s.result.mu.Lock()
s.result.DTMF = ""
s.result.mu.Unlock()
// Play the sequence of audio URIs
s.playSequence(ctx, p)
if s.result.Error != nil {
return
}
// Wait for digits in the silence after the playback sequence completes
s.waitDigits(ctx)
}
}
// playSequence plays the complete audio sequence
func (s *playSession) playSequence(ctx context.Context, p ari.Player) {
seq := newSequence(s)
s.mu.Lock()
s.currentSequence = seq
s.mu.Unlock()
go seq.Play(ctx, p)
// Wait for sequence playback to complete (or context closure to be caught)
select {
case <-ctx.Done():
case <-seq.Done():
if s.result.Status == InProgress {
s.result.Status = Finished
}
}
// Stop audio playback if it is still running
seq.Stop()
// wait for cleanup of sequence so we can get the proper error result
<-seq.Done()
}
// nolint: gocyclo
func (s *playSession) waitDigits(ctx context.Context) {
overallTimer := time.NewTimer(s.o.overallDigitTimeout)
defer overallTimer.Stop()
digitTimeout := s.o.firstDigitTimeout
for {
select {
case <-ctx.Done():
return
case <-time.After(digitTimeout):
return
case <-overallTimer.C:
return
case <-s.digitChan:
if len(s.result.DTMF) > 0 {
digitTimeout = s.o.interDigitTimeout
}
// Determine if a match was found
if s.o.matchFunc != nil {
s.result.mu.Lock()
s.result.DTMF, s.result.MatchResult = s.o.matchFunc(s.result.DTMF)
s.result.mu.Unlock()
switch s.result.MatchResult {
case Complete:
// If we have a complete response, close the entire playback
// and return
s.Stop()
return
case Invalid:
// If invalid, return without waiting
// for any more digits
return
default:
// Incomplete means we should wait for more
}
}
}
}
}
// Stop terminates the execution of a playback
func (s *playSession) Stop() {
if s.result == nil {
s.result = new(Result)
}
// Stop any audio which is still playing
if s.currentSequence != nil {
s.currentSequence.Stop()
<-s.currentSequence.Done()
}
// If we have no other status set, set it to Cancelled
if s.result.Status == InProgress {
s.result.Status = Cancelled
}
// Close out anything else
if s.cancel != nil {
s.cancel()
}
if !s.closed {
s.closed = true
close(s.done)
}
}
func (s *playSession) watchMaxTime(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-time.After(s.o.maxPlaybackTime):
s.Stop()
}
}
func (s *playSession) listenDTMF(ctx context.Context, p ari.Player) {
sub := p.Subscribe(ari.Events.ChannelDtmfReceived)
defer sub.Cancel()
for {
select {
case <-ctx.Done():
return
case e := <-sub.Events():
if e == nil {
return
}
v, ok := e.(*ari.ChannelDtmfReceived)
if !ok {
continue
}
s.result.mu.Lock()
s.result.DTMF += v.Digit
s.result.mu.Unlock()
// Signal receipt of digit, but never block in doing so
select {
case s.digitChan <- v.Digit:
default:
}
// If we have a MatchFunc, stop any playing audio
if s.o.matchFunc != nil && s.currentSequence != nil {
s.currentSequence.Stop()
}
}
}
}
func (s *playSession) Add(list ...string) {
for _, i := range list {
s.o.uriList.Add(i)
}
}
func (s *playSession) Done() <-chan struct{} {
return s.done
}
func (s *playSession) Err() error {
<-s.Done()
return s.result.Error
}
func (s *playSession) StopAudio() {
if s.currentSequence != nil {
s.currentSequence.Stop()
<-s.currentSequence.Done()
}
}
func (s *playSession) Result() (*Result, error) {
<-s.Done()
return s.result, s.result.Error
}