diff --git a/src/a2dp-audio.c b/src/a2dp-audio.c index 635b5927b..a8e810000 100644 --- a/src/a2dp-audio.c +++ b/src/a2dp-audio.c @@ -517,10 +517,6 @@ static void *a2dp_sink_sbc(struct ba_transport_thread *th) { struct io_thread_data io = { .th = th, .rtp_seq_number = -1, - /* Lock transport during initialization stage. This lock will ensure, - * that no one will modify critical section until thread state can be - * known - initialization has failed or succeeded. */ - .t_locked = !ba_transport_thread_cleanup_lock(th), }; if (a2dp_validate_bt_sink(t) != 0) @@ -554,11 +550,8 @@ static void *a2dp_sink_sbc(struct ba_transport_thread *th) { uint16_t sbc_bitpool = 0; #endif - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t len; if ((len = a2dp_poll_and_read_bt(&io, &bt)) <= 0) { @@ -611,6 +604,7 @@ static void *a2dp_sink_sbc(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -631,10 +625,6 @@ static void *a2dp_source_sbc(struct ba_transport_thread *th) { struct io_thread_data io = { .th = th, .timeout = -1, - /* Lock transport during initialization stage. This lock will ensure, - * that no one will modify critical section until thread state can be - * known - initialization has failed or succeeded. */ - .t_locked = !ba_transport_thread_cleanup_lock(th), }; sbc_t sbc; @@ -690,11 +680,8 @@ static void *a2dp_source_sbc(struct ba_transport_thread *th) { uint16_t seq_number = be16toh(rtp_header->seq_number); uint32_t timestamp = be32toh(rtp_header->timestamp); - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t samples; if ((samples = a2dp_poll_and_read_pcm(&t->a2dp.pcm, &io, &pcm)) <= 0) { @@ -764,6 +751,7 @@ static void *a2dp_source_sbc(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -785,7 +773,6 @@ static void *a2dp_sink_mpeg(struct ba_transport_thread *th) { struct io_thread_data io = { .th = th, .rtp_seq_number = -1, - .t_locked = !ba_transport_thread_cleanup_lock(th), }; if (a2dp_validate_bt_sink(t) != 0) @@ -844,11 +831,8 @@ static void *a2dp_sink_mpeg(struct ba_transport_thread *th) { pthread_cleanup_push(PTHREAD_CLEANUP(ba_transport_thread_cleanup_lock), th); - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t len; if ((len = a2dp_poll_and_read_bt(&io, &bt)) <= 0) { @@ -933,6 +917,7 @@ static void *a2dp_sink_mpeg(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -958,7 +943,6 @@ static void *a2dp_source_mp3(struct ba_transport_thread *th) { struct io_thread_data io = { .th = th, .timeout = -1, - .t_locked = !ba_transport_thread_cleanup_lock(th), }; lame_t handle; @@ -1069,11 +1053,8 @@ static void *a2dp_source_mp3(struct ba_transport_thread *th) { uint16_t seq_number = be16toh(rtp_header->seq_number); uint32_t timestamp = be32toh(rtp_header->timestamp); - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t samples; if ((samples = a2dp_poll_and_read_pcm(&t->a2dp.pcm, &io, &pcm)) <= 0) { @@ -1154,6 +1135,7 @@ static void *a2dp_source_mp3(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -1177,7 +1159,6 @@ static void *a2dp_sink_aac(struct ba_transport_thread *th) { struct io_thread_data io = { .th = th, .rtp_seq_number = -1, - .t_locked = !ba_transport_thread_cleanup_lock(th), }; if (a2dp_validate_bt_sink(t) != 0) @@ -1228,11 +1209,8 @@ static void *a2dp_sink_aac(struct ba_transport_thread *th) { int markbit_quirk = -3; - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t len; if ((len = a2dp_poll_and_read_bt(&io, &bt)) <= 0) { @@ -1302,6 +1280,7 @@ static void *a2dp_sink_aac(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -1326,7 +1305,6 @@ static void *a2dp_source_aac(struct ba_transport_thread *th) { struct io_thread_data io = { .th = th, .timeout = -1, - .t_locked = !ba_transport_thread_cleanup_lock(th), }; HANDLE_AACENCODER handle; @@ -1462,11 +1440,8 @@ static void *a2dp_source_aac(struct ba_transport_thread *th) { AACENC_InArgs in_args = { 0 }; AACENC_OutArgs out_args = { 0 }; - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t samples; if ((samples = a2dp_poll_and_read_pcm(&t->a2dp.pcm, &io, &pcm)) <= 0) { @@ -1543,6 +1518,7 @@ static void *a2dp_source_aac(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -1565,7 +1541,6 @@ static void *a2dp_sink_aptx(struct ba_transport_thread *th) { struct ba_transport *t = th->t; struct io_thread_data io = { .th = th, - .t_locked = !ba_transport_thread_cleanup_lock(th), }; if (a2dp_validate_bt_sink(t) != 0) @@ -1593,11 +1568,8 @@ static void *a2dp_sink_aptx(struct ba_transport_thread *th) { pthread_cleanup_push(PTHREAD_CLEANUP(ba_transport_thread_cleanup_lock), th); - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t len; if ((len = a2dp_poll_and_read_bt(&io, &bt)) <= 0) { @@ -1635,6 +1607,7 @@ static void *a2dp_sink_aptx(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -1657,7 +1630,6 @@ static void *a2dp_source_aptx(struct ba_transport_thread *th) { struct io_thread_data io = { .th = th, .timeout = -1, - .t_locked = !ba_transport_thread_cleanup_lock(th), }; HANDLE_APTX handle; @@ -1685,11 +1657,8 @@ static void *a2dp_source_aptx(struct ba_transport_thread *th) { pthread_cleanup_push(PTHREAD_CLEANUP(ba_transport_thread_cleanup_lock), th); - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t samples; if ((samples = a2dp_poll_and_read_pcm(&t->a2dp.pcm, &io, &pcm)) <= 0) { @@ -1753,6 +1722,7 @@ static void *a2dp_source_aptx(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -1775,7 +1745,6 @@ static void *a2dp_sink_aptx_hd(struct ba_transport_thread *th) { struct io_thread_data io = { .th = th, .rtp_seq_number = -1, - .t_locked = !ba_transport_thread_cleanup_lock(th), }; if (a2dp_validate_bt_sink(t) != 0) @@ -1803,11 +1772,8 @@ static void *a2dp_sink_aptx_hd(struct ba_transport_thread *th) { pthread_cleanup_push(PTHREAD_CLEANUP(ba_transport_thread_cleanup_lock), th); - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t len; if ((len = a2dp_poll_and_read_bt(&io, &bt)) <= 0) { @@ -1850,6 +1816,7 @@ static void *a2dp_sink_aptx_hd(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -1872,7 +1839,6 @@ static void *a2dp_source_aptx_hd(struct ba_transport_thread *th) { struct io_thread_data io = { .th = th, .timeout = -1, - .t_locked = !ba_transport_thread_cleanup_lock(th), }; HANDLE_APTX handle; @@ -1908,11 +1874,8 @@ static void *a2dp_source_aptx_hd(struct ba_transport_thread *th) { uint16_t seq_number = be16toh(rtp_header->seq_number); uint32_t timestamp = be32toh(rtp_header->timestamp); - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t samples; if ((samples = a2dp_poll_and_read_pcm(&t->a2dp.pcm, &io, &pcm)) <= 0) { @@ -1984,6 +1947,7 @@ static void *a2dp_source_aptx_hd(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -2006,7 +1970,6 @@ static void *a2dp_sink_ldac(struct ba_transport_thread *th) { struct io_thread_data io = { .th = th, .rtp_seq_number = -1, - .t_locked = !ba_transport_thread_cleanup_lock(th), }; if (a2dp_validate_bt_sink(t) != 0) @@ -2043,11 +2006,8 @@ static void *a2dp_sink_ldac(struct ba_transport_thread *th) { pthread_cleanup_push(PTHREAD_CLEANUP(ba_transport_thread_cleanup_lock), th); - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t len; if ((len = a2dp_poll_and_read_bt(&io, &bt)) <= 0) { @@ -2092,6 +2052,7 @@ static void *a2dp_sink_ldac(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -2115,7 +2076,6 @@ static void *a2dp_source_ldac(struct ba_transport_thread *th) { struct io_thread_data io = { .th = th, .timeout = -1, - .t_locked = !ba_transport_thread_cleanup_lock(th), }; HANDLE_LDAC_BT handle; @@ -2178,11 +2138,8 @@ static void *a2dp_source_ldac(struct ba_transport_thread *th) { uint32_t timestamp = be32toh(rtp_header->timestamp); size_t ts_frames = 0; - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t samples; if ((samples = a2dp_poll_and_read_pcm(&t->a2dp.pcm, &io, &pcm)) <= 0) { @@ -2250,6 +2207,7 @@ static void *a2dp_source_ldac(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -2301,7 +2259,8 @@ static void *a2dp_sink_dump(struct ba_transport_thread *th) { goto fail_ffb; } - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t len; if ((len = a2dp_poll_and_read_bt(&io, &bt)) <= 0) { if (len == -1) @@ -2313,6 +2272,7 @@ static void *a2dp_sink_dump(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); fail_ffb: pthread_cleanup_pop(1); diff --git a/src/ba-transport.c b/src/ba-transport.c index 6da619fd4..f24e46534 100644 --- a/src/ba-transport.c +++ b/src/ba-transport.c @@ -111,6 +111,8 @@ static int transport_thread_init( th->pipe[1] = -1; pthread_mutex_init(&th->mutex, NULL); + pthread_mutex_init(&th->ready_mtx, NULL); + pthread_cond_init(&th->ready, NULL); if (pipe(th->pipe) == -1) return -1; @@ -136,6 +138,7 @@ static void transport_thread_cancel(struct ba_transport_thread *th) { * make sure, that after termination, this thread handler will not * be used anymore. */ th->id = config.main_thread; + th->running = false; } @@ -148,6 +151,8 @@ static void transport_thread_free( if (th->pipe[1] != -1) close(th->pipe[1]); pthread_mutex_destroy(&th->mutex); + pthread_mutex_destroy(&th->ready_mtx); + pthread_cond_destroy(&th->ready); } /** @@ -1026,6 +1031,7 @@ int ba_transport_thread_create( int ret; ba_transport_ref(t); + if ((ret = pthread_create(&th->id, NULL, PTHREAD_ROUTINE(routine), th)) != 0) { error("Couldn't create transport thread: %s", strerror(ret)); th->id = config.main_thread; @@ -1040,6 +1046,13 @@ int ba_transport_thread_create( return 0; } +int ba_transport_thread_ready( + struct ba_transport_thread *th) { + th->running = true; + pthread_cond_signal(&th->ready); + return 0; +} + int ba_transport_thread_send_signal( struct ba_transport_thread *th, enum ba_transport_signal sig) { diff --git a/src/ba-transport.h b/src/ba-transport.h index 0fe213988..83e733048 100644 --- a/src/ba-transport.h +++ b/src/ba-transport.h @@ -144,7 +144,7 @@ struct ba_transport_pcm { struct ba_transport_thread { /* backward reference to transport */ struct ba_transport *t; - /* guard PCM running on this thread */ + /* guard thread structure */ pthread_mutex_t mutex; /* actual thread ID */ pthread_t id; @@ -152,6 +152,10 @@ struct ba_transport_thread { int pipe[2]; /* indicates cleanup lock */ bool cleanup_lock; + /* thread synchronization */ + pthread_mutex_t ready_mtx; + pthread_cond_t ready; + bool running; }; struct ba_transport { @@ -311,6 +315,9 @@ int ba_transport_thread_create( void *(*routine)(struct ba_transport_thread *), const char *name); +int ba_transport_thread_ready( + struct ba_transport_thread *th); + int ba_transport_thread_send_signal( struct ba_transport_thread *th, enum ba_transport_signal sig); @@ -321,4 +328,7 @@ void ba_transport_thread_cleanup(struct ba_transport_thread *th); int ba_transport_thread_cleanup_lock(struct ba_transport_thread *th); int ba_transport_thread_cleanup_unlock(struct ba_transport_thread *th); +#define debug_transport_thread_loop(th, tag) \ + debug("IO loop: %s: %s: %s", tag, __func__, ba_transport_type_to_string((th)->t->type)) + #endif diff --git a/src/bluealsa-dbus.c b/src/bluealsa-dbus.c index 36f99ccce..78a6353e6 100644 --- a/src/bluealsa-dbus.c +++ b/src/bluealsa-dbus.c @@ -342,13 +342,17 @@ static void bluealsa_pcm_open(GDBusMethodInvocation *inv) { struct ba_transport_thread *th = pcm->th; struct ba_transport *t = pcm->t; int pcm_fds[4] = { -1, -1, -1, -1 }; - bool locked = false; size_t i; /* Prevent two (or more) clients trying to * open the same PCM at the same time. */ pthread_mutex_lock(&pcm->dbus_mtx); + /* We must ensure that transport release is not in progress before + * accessing transport critical section. Otherwise, we might have + * the IO thread closing it in the middle of the open procedure! */ + ba_transport_thread_cleanup_lock(th); + /* preliminary check whether HFP codes is selected */ if (t->type.profile & BA_TRANSPORT_PROFILE_MASK_SCO && t->type.codec == HFP_CODEC_UNDEFINED) { @@ -357,12 +361,6 @@ static void bluealsa_pcm_open(GDBusMethodInvocation *inv) { goto fail; } - /* We must ensure that transport release is not in progress before - * accessing transport critical section. Otherwise, we might have - * the IO thread close it in the middle of open procedure! */ - ba_transport_thread_cleanup_lock(th); - locked = true; - if (pcm->fd != -1) { g_dbus_method_invocation_return_error(inv, G_DBUS_ERROR, G_DBUS_ERROR_FAILED, "%s", strerror(EBUSY)); @@ -411,6 +409,16 @@ static void bluealsa_pcm_open(GDBusMethodInvocation *inv) { /* notify our audio thread that the FIFO is ready */ ba_transport_thread_send_signal(th, BA_TRANSPORT_SIGNAL_PCM_OPEN); + /* For source profiles (A2DP Source and SCO Audio Gateway) wait + * until the underlying IO thread is ready to process audio. */ + if (t->type.profile & BA_TRANSPORT_PROFILE_A2DP_SOURCE || + t->type.profile & BA_TRANSPORT_PROFILE_MASK_AG) { + pthread_mutex_lock(&th->ready_mtx); + while (!th->running) + pthread_cond_wait(&th->ready, &th->ready_mtx); + pthread_mutex_unlock(&th->ready_mtx); + } + ba_transport_thread_cleanup_unlock(th); pthread_mutex_unlock(&pcm->dbus_mtx); ba_transport_pcm_unref(pcm); @@ -424,8 +432,7 @@ static void bluealsa_pcm_open(GDBusMethodInvocation *inv) { return; fail: - if (locked) - ba_transport_thread_cleanup_unlock(th); + ba_transport_thread_cleanup_unlock(th); pthread_mutex_unlock(&pcm->dbus_mtx); ba_transport_pcm_unref(pcm); /* clean up created file descriptors */ diff --git a/src/sco.c b/src/sco.c index 984f1c0bb..38db8b7d8 100644 --- a/src/sco.c +++ b/src/sco.c @@ -242,8 +242,8 @@ void *sco_thread(struct ba_transport_thread *th) { { -1, POLLOUT, 0 }, }; - debug("Starting SCO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { /* prevent an unexpected change of the codec value */ const uint16_t codec = t->type.codec; @@ -576,6 +576,7 @@ void *sco_thread(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); fail_ffb: #if ENABLE_MSBC diff --git a/test/bluealsa-mock.c b/test/bluealsa-mock.c index 55093f7da..32794062d 100644 --- a/test/bluealsa-mock.c +++ b/test/bluealsa-mock.c @@ -146,6 +146,9 @@ static void *mock_a2dp_sink(struct ba_transport_thread *th) { bool io_paused = false; int x = 0; + debug_transport_thread_loop(th, "START"); + ba_transport_thread_ready(th); + while (sigusr1_count == 0) { int timout = 0; diff --git a/test/test-io.c b/test/test-io.c index dfeadf894..b007008f0 100644 --- a/test/test-io.c +++ b/test/test-io.c @@ -246,6 +246,9 @@ static void *test_io_thread_a2dp_dump_bt(struct ba_transport_thread *th) { uint8_t buffer[1024]; ssize_t len; + debug_transport_thread_loop(th, "START"); + ba_transport_thread_ready(th); + while (poll(pfds, ARRAYSIZE(pfds), 500) > 0) { if ((len = read(pfds[0].fd, buffer, sizeof(buffer))) == -1) { @@ -282,7 +285,9 @@ static void *test_io_thread_a2dp_dump_pcm(struct ba_transport_thread *th) { ck_assert_ptr_ne(f = fopen(fname, "w"), NULL); } - debug("Starting PCM dump: %d", pfds[0].fd); + debug_transport_thread_loop(th, "START"); + ba_transport_thread_ready(th); + while (poll(pfds, ARRAYSIZE(pfds), 500) > 0) { if ((len = read(pfds[0].fd, buffer, sizeof(buffer))) == -1) {