Skip to content

Commit

Permalink
Synchronize PCM and transport IO thread readiness
Browse files Browse the repository at this point in the history
When a client issues PCM open request, it might expect that when the
call returns the BlueALSA server will be ready to serve audio right
away. However, for A2DP Sources the transport acquisition is made
asynchronously and then the audio processing is also started in the
asynchronous manner. All this might contribute to initial lag in the
audio transfer. By adding synchronization between PCM open request
and an IO thread main event loop readiness, we will eliminate this
potential initial lag.
  • Loading branch information
arkq committed Feb 27, 2021
1 parent 087708f commit 42dd5c2
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 92 deletions.
118 changes: 39 additions & 79 deletions src/a2dp-audio.c
Original file line number Diff line number Diff line change
Expand Up @@ -517,10 +517,6 @@ static void *a2dp_sink_sbc(struct ba_transport_thread *th) {
struct io_thread_data io = {
.th = th,
.rtp_seq_number = -1,
/* Lock transport during initialization stage. This lock will ensure,
* that no one will modify critical section until thread state can be
* known - initialization has failed or succeeded. */
.t_locked = !ba_transport_thread_cleanup_lock(th),
};

if (a2dp_validate_bt_sink(t) != 0)
Expand Down Expand Up @@ -554,11 +550,8 @@ static void *a2dp_sink_sbc(struct ba_transport_thread *th) {
uint16_t sbc_bitpool = 0;
#endif

ba_transport_thread_cleanup_unlock(th);
io.t_locked = false;

debug("Starting IO loop: %s", ba_transport_type_to_string(t->type));
for (;;) {
debug_transport_thread_loop(th, "START");
for (ba_transport_thread_ready(th);;) {

ssize_t len;
if ((len = a2dp_poll_and_read_bt(&io, &bt)) <= 0) {
Expand Down Expand Up @@ -611,6 +604,7 @@ static void *a2dp_sink_sbc(struct ba_transport_thread *th) {
}

fail:
debug_transport_thread_loop(th, "EXIT");
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
pthread_cleanup_pop(!io.t_locked);
fail_ffb:
Expand All @@ -631,10 +625,6 @@ static void *a2dp_source_sbc(struct ba_transport_thread *th) {
struct io_thread_data io = {
.th = th,
.timeout = -1,
/* Lock transport during initialization stage. This lock will ensure,
* that no one will modify critical section until thread state can be
* known - initialization has failed or succeeded. */
.t_locked = !ba_transport_thread_cleanup_lock(th),
};

sbc_t sbc;
Expand Down Expand Up @@ -690,11 +680,8 @@ static void *a2dp_source_sbc(struct ba_transport_thread *th) {
uint16_t seq_number = be16toh(rtp_header->seq_number);
uint32_t timestamp = be32toh(rtp_header->timestamp);

ba_transport_thread_cleanup_unlock(th);
io.t_locked = false;

debug("Starting IO loop: %s", ba_transport_type_to_string(t->type));
for (;;) {
debug_transport_thread_loop(th, "START");
for (ba_transport_thread_ready(th);;) {

ssize_t samples;
if ((samples = a2dp_poll_and_read_pcm(&t->a2dp.pcm, &io, &pcm)) <= 0) {
Expand Down Expand Up @@ -764,6 +751,7 @@ static void *a2dp_source_sbc(struct ba_transport_thread *th) {
}

fail:
debug_transport_thread_loop(th, "EXIT");
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
pthread_cleanup_pop(!io.t_locked);
fail_ffb:
Expand All @@ -785,7 +773,6 @@ static void *a2dp_sink_mpeg(struct ba_transport_thread *th) {
struct io_thread_data io = {
.th = th,
.rtp_seq_number = -1,
.t_locked = !ba_transport_thread_cleanup_lock(th),
};

if (a2dp_validate_bt_sink(t) != 0)
Expand Down Expand Up @@ -844,11 +831,8 @@ static void *a2dp_sink_mpeg(struct ba_transport_thread *th) {

pthread_cleanup_push(PTHREAD_CLEANUP(ba_transport_thread_cleanup_lock), th);

ba_transport_thread_cleanup_unlock(th);
io.t_locked = false;

debug("Starting IO loop: %s", ba_transport_type_to_string(t->type));
for (;;) {
debug_transport_thread_loop(th, "START");
for (ba_transport_thread_ready(th);;) {

ssize_t len;
if ((len = a2dp_poll_and_read_bt(&io, &bt)) <= 0) {
Expand Down Expand Up @@ -933,6 +917,7 @@ static void *a2dp_sink_mpeg(struct ba_transport_thread *th) {
}

fail:
debug_transport_thread_loop(th, "EXIT");
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
pthread_cleanup_pop(!io.t_locked);
fail_ffb:
Expand All @@ -958,7 +943,6 @@ static void *a2dp_source_mp3(struct ba_transport_thread *th) {
struct io_thread_data io = {
.th = th,
.timeout = -1,
.t_locked = !ba_transport_thread_cleanup_lock(th),
};

lame_t handle;
Expand Down Expand Up @@ -1069,11 +1053,8 @@ static void *a2dp_source_mp3(struct ba_transport_thread *th) {
uint16_t seq_number = be16toh(rtp_header->seq_number);
uint32_t timestamp = be32toh(rtp_header->timestamp);

ba_transport_thread_cleanup_unlock(th);
io.t_locked = false;

debug("Starting IO loop: %s", ba_transport_type_to_string(t->type));
for (;;) {
debug_transport_thread_loop(th, "START");
for (ba_transport_thread_ready(th);;) {

ssize_t samples;
if ((samples = a2dp_poll_and_read_pcm(&t->a2dp.pcm, &io, &pcm)) <= 0) {
Expand Down Expand Up @@ -1154,6 +1135,7 @@ static void *a2dp_source_mp3(struct ba_transport_thread *th) {
}

fail:
debug_transport_thread_loop(th, "EXIT");
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
pthread_cleanup_pop(!io.t_locked);
fail_ffb:
Expand All @@ -1177,7 +1159,6 @@ static void *a2dp_sink_aac(struct ba_transport_thread *th) {
struct io_thread_data io = {
.th = th,
.rtp_seq_number = -1,
.t_locked = !ba_transport_thread_cleanup_lock(th),
};

if (a2dp_validate_bt_sink(t) != 0)
Expand Down Expand Up @@ -1228,11 +1209,8 @@ static void *a2dp_sink_aac(struct ba_transport_thread *th) {

int markbit_quirk = -3;

ba_transport_thread_cleanup_unlock(th);
io.t_locked = false;

debug("Starting IO loop: %s", ba_transport_type_to_string(t->type));
for (;;) {
debug_transport_thread_loop(th, "START");
for (ba_transport_thread_ready(th);;) {

ssize_t len;
if ((len = a2dp_poll_and_read_bt(&io, &bt)) <= 0) {
Expand Down Expand Up @@ -1302,6 +1280,7 @@ static void *a2dp_sink_aac(struct ba_transport_thread *th) {
}

fail:
debug_transport_thread_loop(th, "EXIT");
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
pthread_cleanup_pop(!io.t_locked);
fail_ffb:
Expand All @@ -1326,7 +1305,6 @@ static void *a2dp_source_aac(struct ba_transport_thread *th) {
struct io_thread_data io = {
.th = th,
.timeout = -1,
.t_locked = !ba_transport_thread_cleanup_lock(th),
};

HANDLE_AACENCODER handle;
Expand Down Expand Up @@ -1462,11 +1440,8 @@ static void *a2dp_source_aac(struct ba_transport_thread *th) {
AACENC_InArgs in_args = { 0 };
AACENC_OutArgs out_args = { 0 };

ba_transport_thread_cleanup_unlock(th);
io.t_locked = false;

debug("Starting IO loop: %s", ba_transport_type_to_string(t->type));
for (;;) {
debug_transport_thread_loop(th, "START");
for (ba_transport_thread_ready(th);;) {

ssize_t samples;
if ((samples = a2dp_poll_and_read_pcm(&t->a2dp.pcm, &io, &pcm)) <= 0) {
Expand Down Expand Up @@ -1543,6 +1518,7 @@ static void *a2dp_source_aac(struct ba_transport_thread *th) {
}

fail:
debug_transport_thread_loop(th, "EXIT");
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
pthread_cleanup_pop(!io.t_locked);
fail_ffb:
Expand All @@ -1565,7 +1541,6 @@ static void *a2dp_sink_aptx(struct ba_transport_thread *th) {
struct ba_transport *t = th->t;
struct io_thread_data io = {
.th = th,
.t_locked = !ba_transport_thread_cleanup_lock(th),
};

if (a2dp_validate_bt_sink(t) != 0)
Expand Down Expand Up @@ -1593,11 +1568,8 @@ static void *a2dp_sink_aptx(struct ba_transport_thread *th) {

pthread_cleanup_push(PTHREAD_CLEANUP(ba_transport_thread_cleanup_lock), th);

ba_transport_thread_cleanup_unlock(th);
io.t_locked = false;

debug("Starting IO loop: %s", ba_transport_type_to_string(t->type));
for (;;) {
debug_transport_thread_loop(th, "START");
for (ba_transport_thread_ready(th);;) {

ssize_t len;
if ((len = a2dp_poll_and_read_bt(&io, &bt)) <= 0) {
Expand Down Expand Up @@ -1635,6 +1607,7 @@ static void *a2dp_sink_aptx(struct ba_transport_thread *th) {
}

fail:
debug_transport_thread_loop(th, "EXIT");
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
pthread_cleanup_pop(!io.t_locked);
fail_ffb:
Expand All @@ -1657,7 +1630,6 @@ static void *a2dp_source_aptx(struct ba_transport_thread *th) {
struct io_thread_data io = {
.th = th,
.timeout = -1,
.t_locked = !ba_transport_thread_cleanup_lock(th),
};

HANDLE_APTX handle;
Expand Down Expand Up @@ -1685,11 +1657,8 @@ static void *a2dp_source_aptx(struct ba_transport_thread *th) {

pthread_cleanup_push(PTHREAD_CLEANUP(ba_transport_thread_cleanup_lock), th);

ba_transport_thread_cleanup_unlock(th);
io.t_locked = false;

debug("Starting IO loop: %s", ba_transport_type_to_string(t->type));
for (;;) {
debug_transport_thread_loop(th, "START");
for (ba_transport_thread_ready(th);;) {

ssize_t samples;
if ((samples = a2dp_poll_and_read_pcm(&t->a2dp.pcm, &io, &pcm)) <= 0) {
Expand Down Expand Up @@ -1753,6 +1722,7 @@ static void *a2dp_source_aptx(struct ba_transport_thread *th) {
}

fail:
debug_transport_thread_loop(th, "EXIT");
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
pthread_cleanup_pop(!io.t_locked);
fail_ffb:
Expand All @@ -1775,7 +1745,6 @@ static void *a2dp_sink_aptx_hd(struct ba_transport_thread *th) {
struct io_thread_data io = {
.th = th,
.rtp_seq_number = -1,
.t_locked = !ba_transport_thread_cleanup_lock(th),
};

if (a2dp_validate_bt_sink(t) != 0)
Expand Down Expand Up @@ -1803,11 +1772,8 @@ static void *a2dp_sink_aptx_hd(struct ba_transport_thread *th) {

pthread_cleanup_push(PTHREAD_CLEANUP(ba_transport_thread_cleanup_lock), th);

ba_transport_thread_cleanup_unlock(th);
io.t_locked = false;

debug("Starting IO loop: %s", ba_transport_type_to_string(t->type));
for (;;) {
debug_transport_thread_loop(th, "START");
for (ba_transport_thread_ready(th);;) {

ssize_t len;
if ((len = a2dp_poll_and_read_bt(&io, &bt)) <= 0) {
Expand Down Expand Up @@ -1850,6 +1816,7 @@ static void *a2dp_sink_aptx_hd(struct ba_transport_thread *th) {
}

fail:
debug_transport_thread_loop(th, "EXIT");
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
pthread_cleanup_pop(!io.t_locked);
fail_ffb:
Expand All @@ -1872,7 +1839,6 @@ static void *a2dp_source_aptx_hd(struct ba_transport_thread *th) {
struct io_thread_data io = {
.th = th,
.timeout = -1,
.t_locked = !ba_transport_thread_cleanup_lock(th),
};

HANDLE_APTX handle;
Expand Down Expand Up @@ -1908,11 +1874,8 @@ static void *a2dp_source_aptx_hd(struct ba_transport_thread *th) {
uint16_t seq_number = be16toh(rtp_header->seq_number);
uint32_t timestamp = be32toh(rtp_header->timestamp);

ba_transport_thread_cleanup_unlock(th);
io.t_locked = false;

debug("Starting IO loop: %s", ba_transport_type_to_string(t->type));
for (;;) {
debug_transport_thread_loop(th, "START");
for (ba_transport_thread_ready(th);;) {

ssize_t samples;
if ((samples = a2dp_poll_and_read_pcm(&t->a2dp.pcm, &io, &pcm)) <= 0) {
Expand Down Expand Up @@ -1984,6 +1947,7 @@ static void *a2dp_source_aptx_hd(struct ba_transport_thread *th) {
}

fail:
debug_transport_thread_loop(th, "EXIT");
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
pthread_cleanup_pop(!io.t_locked);
fail_ffb:
Expand All @@ -2006,7 +1970,6 @@ static void *a2dp_sink_ldac(struct ba_transport_thread *th) {
struct io_thread_data io = {
.th = th,
.rtp_seq_number = -1,
.t_locked = !ba_transport_thread_cleanup_lock(th),
};

if (a2dp_validate_bt_sink(t) != 0)
Expand Down Expand Up @@ -2043,11 +2006,8 @@ static void *a2dp_sink_ldac(struct ba_transport_thread *th) {

pthread_cleanup_push(PTHREAD_CLEANUP(ba_transport_thread_cleanup_lock), th);

ba_transport_thread_cleanup_unlock(th);
io.t_locked = false;

debug("Starting IO loop: %s", ba_transport_type_to_string(t->type));
for (;;) {
debug_transport_thread_loop(th, "START");
for (ba_transport_thread_ready(th);;) {

ssize_t len;
if ((len = a2dp_poll_and_read_bt(&io, &bt)) <= 0) {
Expand Down Expand Up @@ -2092,6 +2052,7 @@ static void *a2dp_sink_ldac(struct ba_transport_thread *th) {
}

fail:
debug_transport_thread_loop(th, "EXIT");
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
pthread_cleanup_pop(!io.t_locked);
fail_ffb:
Expand All @@ -2115,7 +2076,6 @@ static void *a2dp_source_ldac(struct ba_transport_thread *th) {
struct io_thread_data io = {
.th = th,
.timeout = -1,
.t_locked = !ba_transport_thread_cleanup_lock(th),
};

HANDLE_LDAC_BT handle;
Expand Down Expand Up @@ -2178,11 +2138,8 @@ static void *a2dp_source_ldac(struct ba_transport_thread *th) {
uint32_t timestamp = be32toh(rtp_header->timestamp);
size_t ts_frames = 0;

ba_transport_thread_cleanup_unlock(th);
io.t_locked = false;

debug("Starting IO loop: %s", ba_transport_type_to_string(t->type));
for (;;) {
debug_transport_thread_loop(th, "START");
for (ba_transport_thread_ready(th);;) {

ssize_t samples;
if ((samples = a2dp_poll_and_read_pcm(&t->a2dp.pcm, &io, &pcm)) <= 0) {
Expand Down Expand Up @@ -2250,6 +2207,7 @@ static void *a2dp_source_ldac(struct ba_transport_thread *th) {
}

fail:
debug_transport_thread_loop(th, "EXIT");
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
pthread_cleanup_pop(!io.t_locked);
fail_ffb:
Expand Down Expand Up @@ -2301,7 +2259,8 @@ static void *a2dp_sink_dump(struct ba_transport_thread *th) {
goto fail_ffb;
}

for (;;) {
debug_transport_thread_loop(th, "START");
for (ba_transport_thread_ready(th);;) {
ssize_t len;
if ((len = a2dp_poll_and_read_bt(&io, &bt)) <= 0) {
if (len == -1)
Expand All @@ -2313,6 +2272,7 @@ static void *a2dp_sink_dump(struct ba_transport_thread *th) {
}

fail:
debug_transport_thread_loop(th, "EXIT");
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
fail_ffb:
pthread_cleanup_pop(1);
Expand Down

0 comments on commit 42dd5c2

Please sign in to comment.