From 42dd5c26fd94152e58de1be38e6f27b0d0fa93a6 Mon Sep 17 00:00:00 2001 From: Arkadiusz Bokowy Date: Fri, 26 Feb 2021 20:47:57 +0100 Subject: [PATCH] Synchronize PCM and transport IO thread readiness When a client issues PCM open request, it might expect that when the call returns the BlueALSA server will be ready to serve audio right away. However, for A2DP Sources the transport acquisition is made asynchronously and then the audio processing is also started in the asynchronous manner. All this might contribute to initial lag in the audio transfer. By adding synchronization between PCM open request and an IO thread main event loop readiness, we will eliminate this potential initial lag. --- src/a2dp-audio.c | 118 ++++++++++++++----------------------------- src/ba-transport.c | 13 +++++ src/ba-transport.h | 12 ++++- src/bluealsa-dbus.c | 25 +++++---- src/sco.c | 5 +- test/bluealsa-mock.c | 3 ++ test/test-io.c | 7 ++- 7 files changed, 91 insertions(+), 92 deletions(-) diff --git a/src/a2dp-audio.c b/src/a2dp-audio.c index 635b5927b..a8e810000 100644 --- a/src/a2dp-audio.c +++ b/src/a2dp-audio.c @@ -517,10 +517,6 @@ static void *a2dp_sink_sbc(struct ba_transport_thread *th) { struct io_thread_data io = { .th = th, .rtp_seq_number = -1, - /* Lock transport during initialization stage. This lock will ensure, - * that no one will modify critical section until thread state can be - * known - initialization has failed or succeeded. */ - .t_locked = !ba_transport_thread_cleanup_lock(th), }; if (a2dp_validate_bt_sink(t) != 0) @@ -554,11 +550,8 @@ static void *a2dp_sink_sbc(struct ba_transport_thread *th) { uint16_t sbc_bitpool = 0; #endif - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t len; if ((len = a2dp_poll_and_read_bt(&io, &bt)) <= 0) { @@ -611,6 +604,7 @@ static void *a2dp_sink_sbc(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -631,10 +625,6 @@ static void *a2dp_source_sbc(struct ba_transport_thread *th) { struct io_thread_data io = { .th = th, .timeout = -1, - /* Lock transport during initialization stage. This lock will ensure, - * that no one will modify critical section until thread state can be - * known - initialization has failed or succeeded. */ - .t_locked = !ba_transport_thread_cleanup_lock(th), }; sbc_t sbc; @@ -690,11 +680,8 @@ static void *a2dp_source_sbc(struct ba_transport_thread *th) { uint16_t seq_number = be16toh(rtp_header->seq_number); uint32_t timestamp = be32toh(rtp_header->timestamp); - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t samples; if ((samples = a2dp_poll_and_read_pcm(&t->a2dp.pcm, &io, &pcm)) <= 0) { @@ -764,6 +751,7 @@ static void *a2dp_source_sbc(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -785,7 +773,6 @@ static void *a2dp_sink_mpeg(struct ba_transport_thread *th) { struct io_thread_data io = { .th = th, .rtp_seq_number = -1, - .t_locked = !ba_transport_thread_cleanup_lock(th), }; if (a2dp_validate_bt_sink(t) != 0) @@ -844,11 +831,8 @@ static void *a2dp_sink_mpeg(struct ba_transport_thread *th) { pthread_cleanup_push(PTHREAD_CLEANUP(ba_transport_thread_cleanup_lock), th); - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t len; if ((len = a2dp_poll_and_read_bt(&io, &bt)) <= 0) { @@ -933,6 +917,7 @@ static void *a2dp_sink_mpeg(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -958,7 +943,6 @@ static void *a2dp_source_mp3(struct ba_transport_thread *th) { struct io_thread_data io = { .th = th, .timeout = -1, - .t_locked = !ba_transport_thread_cleanup_lock(th), }; lame_t handle; @@ -1069,11 +1053,8 @@ static void *a2dp_source_mp3(struct ba_transport_thread *th) { uint16_t seq_number = be16toh(rtp_header->seq_number); uint32_t timestamp = be32toh(rtp_header->timestamp); - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t samples; if ((samples = a2dp_poll_and_read_pcm(&t->a2dp.pcm, &io, &pcm)) <= 0) { @@ -1154,6 +1135,7 @@ static void *a2dp_source_mp3(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -1177,7 +1159,6 @@ static void *a2dp_sink_aac(struct ba_transport_thread *th) { struct io_thread_data io = { .th = th, .rtp_seq_number = -1, - .t_locked = !ba_transport_thread_cleanup_lock(th), }; if (a2dp_validate_bt_sink(t) != 0) @@ -1228,11 +1209,8 @@ static void *a2dp_sink_aac(struct ba_transport_thread *th) { int markbit_quirk = -3; - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t len; if ((len = a2dp_poll_and_read_bt(&io, &bt)) <= 0) { @@ -1302,6 +1280,7 @@ static void *a2dp_sink_aac(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -1326,7 +1305,6 @@ static void *a2dp_source_aac(struct ba_transport_thread *th) { struct io_thread_data io = { .th = th, .timeout = -1, - .t_locked = !ba_transport_thread_cleanup_lock(th), }; HANDLE_AACENCODER handle; @@ -1462,11 +1440,8 @@ static void *a2dp_source_aac(struct ba_transport_thread *th) { AACENC_InArgs in_args = { 0 }; AACENC_OutArgs out_args = { 0 }; - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t samples; if ((samples = a2dp_poll_and_read_pcm(&t->a2dp.pcm, &io, &pcm)) <= 0) { @@ -1543,6 +1518,7 @@ static void *a2dp_source_aac(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -1565,7 +1541,6 @@ static void *a2dp_sink_aptx(struct ba_transport_thread *th) { struct ba_transport *t = th->t; struct io_thread_data io = { .th = th, - .t_locked = !ba_transport_thread_cleanup_lock(th), }; if (a2dp_validate_bt_sink(t) != 0) @@ -1593,11 +1568,8 @@ static void *a2dp_sink_aptx(struct ba_transport_thread *th) { pthread_cleanup_push(PTHREAD_CLEANUP(ba_transport_thread_cleanup_lock), th); - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t len; if ((len = a2dp_poll_and_read_bt(&io, &bt)) <= 0) { @@ -1635,6 +1607,7 @@ static void *a2dp_sink_aptx(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -1657,7 +1630,6 @@ static void *a2dp_source_aptx(struct ba_transport_thread *th) { struct io_thread_data io = { .th = th, .timeout = -1, - .t_locked = !ba_transport_thread_cleanup_lock(th), }; HANDLE_APTX handle; @@ -1685,11 +1657,8 @@ static void *a2dp_source_aptx(struct ba_transport_thread *th) { pthread_cleanup_push(PTHREAD_CLEANUP(ba_transport_thread_cleanup_lock), th); - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t samples; if ((samples = a2dp_poll_and_read_pcm(&t->a2dp.pcm, &io, &pcm)) <= 0) { @@ -1753,6 +1722,7 @@ static void *a2dp_source_aptx(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -1775,7 +1745,6 @@ static void *a2dp_sink_aptx_hd(struct ba_transport_thread *th) { struct io_thread_data io = { .th = th, .rtp_seq_number = -1, - .t_locked = !ba_transport_thread_cleanup_lock(th), }; if (a2dp_validate_bt_sink(t) != 0) @@ -1803,11 +1772,8 @@ static void *a2dp_sink_aptx_hd(struct ba_transport_thread *th) { pthread_cleanup_push(PTHREAD_CLEANUP(ba_transport_thread_cleanup_lock), th); - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t len; if ((len = a2dp_poll_and_read_bt(&io, &bt)) <= 0) { @@ -1850,6 +1816,7 @@ static void *a2dp_sink_aptx_hd(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -1872,7 +1839,6 @@ static void *a2dp_source_aptx_hd(struct ba_transport_thread *th) { struct io_thread_data io = { .th = th, .timeout = -1, - .t_locked = !ba_transport_thread_cleanup_lock(th), }; HANDLE_APTX handle; @@ -1908,11 +1874,8 @@ static void *a2dp_source_aptx_hd(struct ba_transport_thread *th) { uint16_t seq_number = be16toh(rtp_header->seq_number); uint32_t timestamp = be32toh(rtp_header->timestamp); - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t samples; if ((samples = a2dp_poll_and_read_pcm(&t->a2dp.pcm, &io, &pcm)) <= 0) { @@ -1984,6 +1947,7 @@ static void *a2dp_source_aptx_hd(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -2006,7 +1970,6 @@ static void *a2dp_sink_ldac(struct ba_transport_thread *th) { struct io_thread_data io = { .th = th, .rtp_seq_number = -1, - .t_locked = !ba_transport_thread_cleanup_lock(th), }; if (a2dp_validate_bt_sink(t) != 0) @@ -2043,11 +2006,8 @@ static void *a2dp_sink_ldac(struct ba_transport_thread *th) { pthread_cleanup_push(PTHREAD_CLEANUP(ba_transport_thread_cleanup_lock), th); - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t len; if ((len = a2dp_poll_and_read_bt(&io, &bt)) <= 0) { @@ -2092,6 +2052,7 @@ static void *a2dp_sink_ldac(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -2115,7 +2076,6 @@ static void *a2dp_source_ldac(struct ba_transport_thread *th) { struct io_thread_data io = { .th = th, .timeout = -1, - .t_locked = !ba_transport_thread_cleanup_lock(th), }; HANDLE_LDAC_BT handle; @@ -2178,11 +2138,8 @@ static void *a2dp_source_ldac(struct ba_transport_thread *th) { uint32_t timestamp = be32toh(rtp_header->timestamp); size_t ts_frames = 0; - ba_transport_thread_cleanup_unlock(th); - io.t_locked = false; - - debug("Starting IO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t samples; if ((samples = a2dp_poll_and_read_pcm(&t->a2dp.pcm, &io, &pcm)) <= 0) { @@ -2250,6 +2207,7 @@ static void *a2dp_source_ldac(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_pop(!io.t_locked); fail_ffb: @@ -2301,7 +2259,8 @@ static void *a2dp_sink_dump(struct ba_transport_thread *th) { goto fail_ffb; } - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { ssize_t len; if ((len = a2dp_poll_and_read_bt(&io, &bt)) <= 0) { if (len == -1) @@ -2313,6 +2272,7 @@ static void *a2dp_sink_dump(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); fail_ffb: pthread_cleanup_pop(1); diff --git a/src/ba-transport.c b/src/ba-transport.c index 6da619fd4..f24e46534 100644 --- a/src/ba-transport.c +++ b/src/ba-transport.c @@ -111,6 +111,8 @@ static int transport_thread_init( th->pipe[1] = -1; pthread_mutex_init(&th->mutex, NULL); + pthread_mutex_init(&th->ready_mtx, NULL); + pthread_cond_init(&th->ready, NULL); if (pipe(th->pipe) == -1) return -1; @@ -136,6 +138,7 @@ static void transport_thread_cancel(struct ba_transport_thread *th) { * make sure, that after termination, this thread handler will not * be used anymore. */ th->id = config.main_thread; + th->running = false; } @@ -148,6 +151,8 @@ static void transport_thread_free( if (th->pipe[1] != -1) close(th->pipe[1]); pthread_mutex_destroy(&th->mutex); + pthread_mutex_destroy(&th->ready_mtx); + pthread_cond_destroy(&th->ready); } /** @@ -1026,6 +1031,7 @@ int ba_transport_thread_create( int ret; ba_transport_ref(t); + if ((ret = pthread_create(&th->id, NULL, PTHREAD_ROUTINE(routine), th)) != 0) { error("Couldn't create transport thread: %s", strerror(ret)); th->id = config.main_thread; @@ -1040,6 +1046,13 @@ int ba_transport_thread_create( return 0; } +int ba_transport_thread_ready( + struct ba_transport_thread *th) { + th->running = true; + pthread_cond_signal(&th->ready); + return 0; +} + int ba_transport_thread_send_signal( struct ba_transport_thread *th, enum ba_transport_signal sig) { diff --git a/src/ba-transport.h b/src/ba-transport.h index 0fe213988..83e733048 100644 --- a/src/ba-transport.h +++ b/src/ba-transport.h @@ -144,7 +144,7 @@ struct ba_transport_pcm { struct ba_transport_thread { /* backward reference to transport */ struct ba_transport *t; - /* guard PCM running on this thread */ + /* guard thread structure */ pthread_mutex_t mutex; /* actual thread ID */ pthread_t id; @@ -152,6 +152,10 @@ struct ba_transport_thread { int pipe[2]; /* indicates cleanup lock */ bool cleanup_lock; + /* thread synchronization */ + pthread_mutex_t ready_mtx; + pthread_cond_t ready; + bool running; }; struct ba_transport { @@ -311,6 +315,9 @@ int ba_transport_thread_create( void *(*routine)(struct ba_transport_thread *), const char *name); +int ba_transport_thread_ready( + struct ba_transport_thread *th); + int ba_transport_thread_send_signal( struct ba_transport_thread *th, enum ba_transport_signal sig); @@ -321,4 +328,7 @@ void ba_transport_thread_cleanup(struct ba_transport_thread *th); int ba_transport_thread_cleanup_lock(struct ba_transport_thread *th); int ba_transport_thread_cleanup_unlock(struct ba_transport_thread *th); +#define debug_transport_thread_loop(th, tag) \ + debug("IO loop: %s: %s: %s", tag, __func__, ba_transport_type_to_string((th)->t->type)) + #endif diff --git a/src/bluealsa-dbus.c b/src/bluealsa-dbus.c index 36f99ccce..78a6353e6 100644 --- a/src/bluealsa-dbus.c +++ b/src/bluealsa-dbus.c @@ -342,13 +342,17 @@ static void bluealsa_pcm_open(GDBusMethodInvocation *inv) { struct ba_transport_thread *th = pcm->th; struct ba_transport *t = pcm->t; int pcm_fds[4] = { -1, -1, -1, -1 }; - bool locked = false; size_t i; /* Prevent two (or more) clients trying to * open the same PCM at the same time. */ pthread_mutex_lock(&pcm->dbus_mtx); + /* We must ensure that transport release is not in progress before + * accessing transport critical section. Otherwise, we might have + * the IO thread closing it in the middle of the open procedure! */ + ba_transport_thread_cleanup_lock(th); + /* preliminary check whether HFP codes is selected */ if (t->type.profile & BA_TRANSPORT_PROFILE_MASK_SCO && t->type.codec == HFP_CODEC_UNDEFINED) { @@ -357,12 +361,6 @@ static void bluealsa_pcm_open(GDBusMethodInvocation *inv) { goto fail; } - /* We must ensure that transport release is not in progress before - * accessing transport critical section. Otherwise, we might have - * the IO thread close it in the middle of open procedure! */ - ba_transport_thread_cleanup_lock(th); - locked = true; - if (pcm->fd != -1) { g_dbus_method_invocation_return_error(inv, G_DBUS_ERROR, G_DBUS_ERROR_FAILED, "%s", strerror(EBUSY)); @@ -411,6 +409,16 @@ static void bluealsa_pcm_open(GDBusMethodInvocation *inv) { /* notify our audio thread that the FIFO is ready */ ba_transport_thread_send_signal(th, BA_TRANSPORT_SIGNAL_PCM_OPEN); + /* For source profiles (A2DP Source and SCO Audio Gateway) wait + * until the underlying IO thread is ready to process audio. */ + if (t->type.profile & BA_TRANSPORT_PROFILE_A2DP_SOURCE || + t->type.profile & BA_TRANSPORT_PROFILE_MASK_AG) { + pthread_mutex_lock(&th->ready_mtx); + while (!th->running) + pthread_cond_wait(&th->ready, &th->ready_mtx); + pthread_mutex_unlock(&th->ready_mtx); + } + ba_transport_thread_cleanup_unlock(th); pthread_mutex_unlock(&pcm->dbus_mtx); ba_transport_pcm_unref(pcm); @@ -424,8 +432,7 @@ static void bluealsa_pcm_open(GDBusMethodInvocation *inv) { return; fail: - if (locked) - ba_transport_thread_cleanup_unlock(th); + ba_transport_thread_cleanup_unlock(th); pthread_mutex_unlock(&pcm->dbus_mtx); ba_transport_pcm_unref(pcm); /* clean up created file descriptors */ diff --git a/src/sco.c b/src/sco.c index 984f1c0bb..38db8b7d8 100644 --- a/src/sco.c +++ b/src/sco.c @@ -242,8 +242,8 @@ void *sco_thread(struct ba_transport_thread *th) { { -1, POLLOUT, 0 }, }; - debug("Starting SCO loop: %s", ba_transport_type_to_string(t->type)); - for (;;) { + debug_transport_thread_loop(th, "START"); + for (ba_transport_thread_ready(th);;) { /* prevent an unexpected change of the codec value */ const uint16_t codec = t->type.codec; @@ -576,6 +576,7 @@ void *sco_thread(struct ba_transport_thread *th) { } fail: + debug_transport_thread_loop(th, "EXIT"); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); fail_ffb: #if ENABLE_MSBC diff --git a/test/bluealsa-mock.c b/test/bluealsa-mock.c index 55093f7da..32794062d 100644 --- a/test/bluealsa-mock.c +++ b/test/bluealsa-mock.c @@ -146,6 +146,9 @@ static void *mock_a2dp_sink(struct ba_transport_thread *th) { bool io_paused = false; int x = 0; + debug_transport_thread_loop(th, "START"); + ba_transport_thread_ready(th); + while (sigusr1_count == 0) { int timout = 0; diff --git a/test/test-io.c b/test/test-io.c index dfeadf894..b007008f0 100644 --- a/test/test-io.c +++ b/test/test-io.c @@ -246,6 +246,9 @@ static void *test_io_thread_a2dp_dump_bt(struct ba_transport_thread *th) { uint8_t buffer[1024]; ssize_t len; + debug_transport_thread_loop(th, "START"); + ba_transport_thread_ready(th); + while (poll(pfds, ARRAYSIZE(pfds), 500) > 0) { if ((len = read(pfds[0].fd, buffer, sizeof(buffer))) == -1) { @@ -282,7 +285,9 @@ static void *test_io_thread_a2dp_dump_pcm(struct ba_transport_thread *th) { ck_assert_ptr_ne(f = fopen(fname, "w"), NULL); } - debug("Starting PCM dump: %d", pfds[0].fd); + debug_transport_thread_loop(th, "START"); + ba_transport_thread_ready(th); + while (poll(pfds, ARRAYSIZE(pfds), 500) > 0) { if ((len = read(pfds[0].fd, buffer, sizeof(buffer))) == -1) {