diff --git a/include/baresip.h b/include/baresip.h index a880f3509c..09f01a0a1e 100644 --- a/include/baresip.h +++ b/include/baresip.h @@ -1490,6 +1490,7 @@ int jbuf_stats(const struct jbuf *jb, struct jbuf_stat *jstat); int jbuf_debug(struct re_printf *pf, const struct jbuf *jb); uint32_t jbuf_frames(const struct jbuf *jb); uint32_t jbuf_packets(const struct jbuf *jb); +int32_t jbuf_next_play(const struct jbuf *jb); /* diff --git a/src/jbuf.c b/src/jbuf.c index 5cfc4c4e60..b6914233fd 100644 --- a/src/jbuf.c +++ b/src/jbuf.c @@ -462,7 +462,7 @@ static uint32_t calc_playout_time(struct jbuf *jb, struct packet *p) /* Check min/max latency requirements */ /* @TODO: */ - + playout_time += 960 * jb->min; return playout_time; } @@ -707,6 +707,8 @@ void jbuf_flush(struct jbuf *jb) if (!jb) return; + //@TODO: reset playout specifc values + mtx_lock(jb->lock); if (jb->packetl.head) { DEBUG_INFO("flush: %u frames\n", jb->n); @@ -754,6 +756,22 @@ uint32_t jbuf_packets(const struct jbuf *jb) } +int32_t jbuf_next_play(const struct jbuf *jb) +{ + if (!jb || !jb->packetl.head) + return -1; + + struct packet *p = jb->packetl.head->data; + + uint32_t current = (uint32_t)(tmr_jiffies() * jb->p.srate / 1000); + + if (p->playout_time < current) + return 0; /* already late */ + + return (p->playout_time - current) * 1000 / jb->p.srate; +} + + /** * Get jitter buffer statistics * diff --git a/src/stream.c b/src/stream.c index ac5cbc35e0..60e4c3f850 100644 --- a/src/stream.c +++ b/src/stream.c @@ -76,6 +76,7 @@ struct stream { bool mnat_connected; /**< Media NAT is connected */ bool menc_secure; /**< Media stream is secure */ struct tmr tmr_natph; /**< Timer for NAT pinhole */ + struct tmr tmr_decode; /**< Decode fallback timer */ uint32_t natphc; /**< NAT pinhole RTP counter */ bool pinhole; /**< NAT pinhole flag */ stream_pt_h *pth; /**< Stream payload type handler */ @@ -97,7 +98,7 @@ struct stream { }; -static int stream_decode(struct stream *s); +static void stream_decode(struct stream *s); static void print_rtp_stats(const struct stream *s) @@ -147,6 +148,7 @@ static void stream_destructor(void *arg) tmr_cancel(&s->rx.tmr_rtp); tmr_cancel(&s->tmr_natph); + tmr_cancel(&s->tmr_decode); list_unlink(&s->le); mem_deref(s->sdp); mem_deref(s->mes); @@ -392,6 +394,15 @@ static int handle_rtp(struct stream *s, const struct rtp_header *hdr, } +/* Fallback tmr if no packets arrive in time (avoids jbuf overrun) */ +static void decode_fallback(void *arg) +{ + struct stream *s = arg; + + stream_decode(s); +} + + static void rtp_handler(const struct sa *src, const struct rtp_header *hdr, struct mbuf *mb, void *arg) { @@ -470,11 +481,7 @@ static void rtp_handler(const struct sa *src, const struct rtp_header *hdr, metric_inc_err(s->rx.metric); } - uint32_t n = jbuf_packets(s->rx.jbuf); - while (n--) { - if (stream_decode(s) != EAGAIN) - break; - } + stream_decode(s); } else { (void)handle_rtp(s, hdr, mb, 0, false); @@ -490,33 +497,39 @@ static void rtp_handler(const struct sa *src, const struct rtp_header *hdr, * @return 0 if success, EAGAIN if it should be called again in order to avoid * a jitter buffer overflow, otherwise errorcode */ -static int stream_decode(struct stream *s) +static void stream_decode(struct stream *s) { struct rtp_header hdr; void *mb; int lostc; + int32_t delay; int err; - int err2; - - if (!s) - return EINVAL; - if (!s->rx.jbuf) - return ENOENT; + if (!s || !s->rx.jbuf) + return; - err = jbuf_get(s->rx.jbuf, &hdr, &mb); - if (err && err != EAGAIN) - return ENOENT; + uint32_t n = jbuf_packets(s->rx.jbuf); + while (n--) { + err = jbuf_get(s->rx.jbuf, &hdr, &mb); + if (err && err != EAGAIN) + goto out; - lostc = lostcalc(&s->rx, hdr.seq); + lostc = lostcalc(&s->rx, hdr.seq); - err2 = handle_rtp(s, &hdr, mb, lostc > 0 ? lostc : 0, err == EAGAIN); - mem_deref(mb); + err = handle_rtp(s, &hdr, mb, lostc > 0 ? lostc : 0, + err == EAGAIN); + mem_deref(mb); - if (err2 == EAGAIN) - return err2; + if (err != EAGAIN) + goto out; + } - return err; +out: + delay = jbuf_next_play(s->rx.jbuf); + if (delay < 0) + return; + delay += delay / 2; + tmr_start(&s->tmr_decode, delay, decode_fallback, s); } @@ -768,6 +781,7 @@ int stream_alloc(struct stream **sp, struct list *streaml, s->ldir = SDP_SENDRECV; s->pinhole = true; tmr_init(&s->tmr_natph); + tmr_init(&s->tmr_decode); if (prm->use_rtp) { err = stream_sock_alloc(s, prm->af);