-
Notifications
You must be signed in to change notification settings - Fork 0
/
threaded.rs
177 lines (154 loc) · 6.28 KB
/
threaded.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
mod util;
mod wav;
use libpulse_binding::context::{self, Context, FlagSet as ContextFlagSet};
use libpulse_binding::mainloop::threaded::Mainloop;
use libpulse_binding::proplist::Proplist;
use libpulse_binding::stream::{self, SeekMode, Stream};
use std::cell::RefCell;
use std::error::Error;
use std::ops::Deref;
use std::rc::Rc;
// NOTE: most methods of checking states and using callbacks are taken from the libpulse_binding docs
// see: https://docs.rs/libpulse-binding/2.26.0/libpulse_binding/mainloop/threaded/index.html
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// create threaded main loop
let main_loop = Rc::new(RefCell::new(
Mainloop::new().ok_or("Failed to create mainloop")?,
));
// create pulse context
let props = Proplist::new().ok_or("Failed to create PulseAudio Proplist")?;
let pa_ctx = Rc::new(RefCell::new(
Context::new_with_proplist(main_loop.borrow().deref(), "app_name", &props)
.ok_or("Failed to create PulseAudio context")?,
));
// Context state change callback
{
let ml_ref = Rc::clone(&main_loop);
let context_ref = Rc::clone(&pa_ctx);
pa_ctx
.borrow_mut()
.set_state_callback(Some(Box::new(move || {
let state = unsafe { (*context_ref.as_ptr()).get_state() };
match state {
context::State::Ready
| context::State::Failed
| context::State::Terminated => unsafe {
(*ml_ref.as_ptr()).signal(false);
},
_ => {}
}
})));
}
// connect context to pulse main loop
pa_ctx
.borrow_mut()
.connect(None, ContextFlagSet::NOFLAGS, None)?;
main_loop.borrow_mut().lock();
main_loop.borrow_mut().start()?;
// Wait for context to be ready
loop {
match pa_ctx.borrow().get_state() {
context::State::Ready => {
break;
}
context::State::Failed | context::State::Terminated => {
main_loop.borrow_mut().unlock();
main_loop.borrow_mut().stop();
return Err("Context state failed/terminated, quitting...".into());
}
_ => {
main_loop.borrow_mut().wait();
}
}
}
pa_ctx.borrow_mut().set_state_callback(None);
// setup simple channel to be notified when the stream is ready --------------------------------
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
// create a pulse stream -----------------------------------------------------------------------
{
let (spec, audio_data) = wav::read_wav_file().await?;
let audio_data_len = audio_data.len();
// create pulse stream
let stream = match Stream::new(&mut pa_ctx.borrow_mut(), "SAMPLE_NAME", &spec, None) {
Some(stream) => Rc::new(RefCell::new(stream)),
None => panic!("failed to create new stream"),
};
// Stream state change callback
{
let ml_ref = Rc::clone(&main_loop);
let stream_ref = Rc::clone(&stream);
stream
.borrow_mut()
.set_state_callback(Some(Box::new(move || {
let state = unsafe { (*stream_ref.as_ptr()).get_state() };
match state {
stream::State::Ready
| stream::State::Failed
| stream::State::Terminated => unsafe {
(*ml_ref.as_ptr()).signal(false);
},
_ => {}
}
})));
}
// Stream write callback
{
let stream_ref = Rc::clone(&stream);
let mut bytes_written = 0;
stream
.borrow_mut()
.set_write_callback(Some(Box::new(move |len| {
stream_ref
.borrow_mut()
.write(&audio_data, None, 0, SeekMode::Relative)
.expect("failed to write to stream");
bytes_written += len;
if bytes_written == audio_data.len() {
// FIXME !!!! FIXME !!!!
// a segmentation fault occurs when calling `.set_write_callback` here
if util::should_unset_write_callback() {
stream_ref.borrow_mut().set_write_callback(None);
}
// we're done writing the audio data, tell the server to convert this stream to a sample
stream_ref
.borrow_mut()
.finish_upload()
.expect("failed to finish upload");
tx.send(()).unwrap();
}
})));
}
// connect the stream as an upload, which sends it to the audio server instead of playing it directly
stream.borrow_mut().connect_upload(audio_data_len)?;
// Wait for stream to be ready
loop {
// intentionally wrapping in a block to prevent multiple borrows from occurring
// (from within the write callback)
let stream_state = { stream.borrow().get_state() };
match stream_state {
stream::State::Ready => {
break;
}
stream::State::Failed | stream::State::Terminated => {
main_loop.borrow_mut().unlock();
main_loop.borrow_mut().stop();
return Err("Stream state failed/terminated, quitting...".into());
}
_ => {
main_loop.borrow_mut().wait();
}
}
}
stream.borrow_mut().set_state_callback(None);
}
main_loop.borrow_mut().unlock();
// play the sample
while let Some(()) = rx.recv().await {
pa_ctx
.borrow_mut()
.play_sample("SAMPLE_NAME", None, None, None);
}
// block forever since pulse main loop is running and we don't want to stop it
futures::future::pending::<Result<(), Box<dyn Error>>>().await
}