Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaseizinger committed May 21, 2024
2 parents 49b9ac8 + a08d0f9 commit efd420e
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 46 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Unreleased

* Bump sctp-proto to 0.2.2 #511
* Adjust logging levels to be less noisy #510
* Fix crash when using VLA (or other) optional RTP exts with SDP #509
* Re-add manually invalidated IceAgent candidates #508
* New API to reset BWE state #506
* Change parameter in BWE algo to match libwebrtc #506

# 0.5.1

* Expose STUN packet split_username() fn #505
Expand Down
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ thiserror = "1.0.38"
tracing = "0.1.37"
fastrand = "2.0.1"
once_cell = "1.17.0"
sctp-proto = "0.2.1"
sctp-proto = "0.2.2"
combine = "4.6.6"
# Sadly no DTLS support in rustls.
# If you want to use a system provided openssl you can set env variable
Expand All @@ -35,13 +35,13 @@ crc = "3.0.0"
serde = { version = "1.0.152", features = ["derive"] }

[target.'cfg(unix)'.dependencies]
sha-1 = { version = "0.10.1", features = ["asm"] }
sha1 = { version = "0.10.6", features = ["asm"] }

# Don't use `asm` on Windows until https://github.com/RustCrypto/asm-hashes/issues/45 is fixed.
# The `asm` feature isn't compatible with `windows-msvc` toolchain and `openssl` breaks if we want to use `windows-gnu`.
# Thus, don't use `asm` feature on Windows.
[target.'cfg(windows)'.dependencies]
sha-1 = { version = "0.10.1" }
sha1 = { version = "0.10.6" }

[dev-dependencies]
rouille = { version = "3.5.0", features = ["ssl"] }
Expand Down
12 changes: 12 additions & 0 deletions src/bwe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,16 @@ impl<'a> Bwe<'a> {
pub fn set_desired_bitrate(&mut self, desired_bitrate: Bitrate) {
self.0.session.set_bwe_desired_bitrate(desired_bitrate);
}

/// Reset the BWE with a new init_bitrate
///
/// # Example
///
/// This method is useful when you initially start with only an audio stream. In this case, the BWE will report a very low estimated bitrate.
/// Later, when you start a video stream, the estimated bitrate will be affected by the previous low bitrate, resulting in a very low estimated bitrate, which can cause poor video stream quality.
/// To avoid this, you need to warm up the video stream for a while then calling reset with a provided init_bitrate.
///
pub fn reset(&mut self, init_bitrate: Bitrate) {
self.0.session.reset_bwe(init_bitrate);
}
}
14 changes: 12 additions & 2 deletions src/change/sdp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1035,11 +1035,21 @@ fn update_media(
for (id, ext) in m.extmaps().into_iter() {
// The remapping of extensions should already have happened, which
// means the ID are matching in the session to the remote.
if exts.lookup(id) != Some(ext) {

// Does the ID exist in session?
let in_session = match exts.lookup(id) {
Some(v) => v,
None => continue,
};

if in_session != ext {
// Don't set any extensions that aren't enabled in Session.
continue;
}
remote_extmap.set(id, ext.clone());

// Use the Extension from session, since there might be a special
// serializer for cases like VLA.
remote_extmap.set(id, in_session.clone());
}
media.set_remote_extmap(remote_extmap);

Expand Down
39 changes: 26 additions & 13 deletions src/ice/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@ use crate::util::NonCryptographicRng;
use super::candidate::{Candidate, CandidateKind};
use super::pair::{CandidatePair, CheckState, PairId};

/// Timing advance (Ta) value.
///
/// ICE agents SHOULD use a default Ta value, 50 ms, but MAY use another
/// value based on the characteristics of the associated data.
const TIMING_ADVANCE: Duration = Duration::from_millis(50);

/// Handles the ICE protocol for a given peer.
///
/// Each connection between two peers corresponds to one [`IceAgent`] on either end.
Expand All @@ -31,6 +25,12 @@ pub struct IceAgent {
/// This drives the state forward.
last_now: Option<Instant>,

/// Timing advance (Ta) value.
///
/// ICE agents SHOULD use a default Ta value, 50 ms, but MAY use another
/// value based on the characteristics of the associated data.
timing_advance: Duration,

/// Whether this agent is operating as ice-lite.
/// ice-lite is a minimal version of the ICE specification, intended for servers
/// running on a public IP address. ice-lite requires the media server to only answer
Expand Down Expand Up @@ -268,6 +268,7 @@ impl IceAgent {
discovered_recv: HashSet::new(),
nominated_send: None,
stats: IceAgentStats::default(),
timing_advance: Duration::from_millis(50),
}
}

Expand All @@ -285,6 +286,16 @@ impl IceAgent {
self.ice_lite = enabled;
}

/// Set a new timing advance (Ta) value.
///
/// Ta specifies the minimum increment of time that has to pass between calls to
/// [`IceAgent::handle_timeout`]s (guided via [`IceAgent::poll_timeout`]).
///
/// Defaults to 50ms.
pub fn set_timing_advance(&mut self, duration: Duration) {
self.timing_advance = duration
}

/// Local ice credentials.
pub fn local_credentials(&self) -> &IceCreds {
&self.local_credentials
Expand Down Expand Up @@ -1047,7 +1058,7 @@ impl IceAgent {

// We must empty the queued replies or stuff to send as soon as possible.
if has_request || has_transmit {
return Some(last_now + TIMING_ADVANCE);
return Some(last_now + self.timing_advance);
}

// when do we need to handle the next candidate pair?
Expand All @@ -1063,8 +1074,8 @@ impl IceAgent {

// Time must advance with at least Ta.
let next = if let Some(next) = maybe_next {
if next < last_now + TIMING_ADVANCE {
last_now + TIMING_ADVANCE
if next < last_now + self.timing_advance {
last_now + self.timing_advance
} else {
next
}
Expand Down Expand Up @@ -1333,9 +1344,11 @@ impl IceAgent {

let reply = StunMessage::reply(req.trans_id, req.source);

debug!(
trace!(
"Send STUN reply: {} -> {} {:?}",
local_addr, remote_addr, reply
local_addr,
remote_addr,
reply
);

let mut buf = vec![0_u8; DATAGRAM_MTU];
Expand Down Expand Up @@ -1378,7 +1391,7 @@ impl IceAgent {
use_candidate,
);

debug!(
trace!(
"Send STUN request: {} -> {} {:?}",
local.base(),
remote.addr(),
Expand Down Expand Up @@ -1923,7 +1936,7 @@ mod test {
agent.handle_timeout(now1);
let now2 = agent.poll_timeout().unwrap();

assert!(now2 - now1 == TIMING_ADVANCE);
assert!(now2 - now1 == Duration::from_millis(50));
}

#[test]
Expand Down
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1274,7 +1274,9 @@ impl Rtc {

match &o {
Output::Event(e) => match e {
Event::ChannelData(_) | Event::MediaData(_) => trace!("{:?}", e),
Event::ChannelData(_) | Event::MediaData(_) | Event::RtpPacket(_) => {
trace!("{:?}", e)
}
_ => debug!("{:?}", e),
},
Output::Transmit(t) => {
Expand Down
8 changes: 4 additions & 4 deletions src/packet/bwe/acked_bitrate_estimator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ impl AckedBitrateEstimator {
// current estimate. With low values of uncertainty_symmetry_cap_ we add more
// uncertainty to increases than to decreases. For higher values we approach
// symmetry.
let sample_uncertainty =
scale * (estimate_bps - sample_estimate_bps).abs() / (estimate_bps.max(25_000.0));
let sample_uncertainty = scale * (estimate_bps - sample_estimate_bps).abs() / estimate_bps;
let sample_var = sample_uncertainty.powf(2.0);

// Update a bayesian estimate of the rate, weighting it lower if the sample
Expand All @@ -85,6 +84,7 @@ impl AckedBitrateEstimator {
let mut new_estimate = (sample_var * estimate_bps
+ pred_bitrate_estimate_var * sample_estimate_bps)
/ (sample_var + pred_bitrate_estimate_var);

new_estimate = new_estimate.max(ESTIMATE_FLOOR.as_f64());
self.estimate = Some(Bitrate::bps(new_estimate.ceil() as u64));
self.estimate_var =
Expand Down Expand Up @@ -136,7 +136,7 @@ impl AckedBitrateEstimator {

self.sum += packet_size;

estimate.map(|e| (e, false))
estimate.map(|e| (e, is_small))
}
}

Expand Down Expand Up @@ -200,7 +200,7 @@ mod test {

assert_eq!(
estimate.as_u64(),
99530,
108320,
"AckedBitrateEstiamtor should produce the correct bitrate"
);
}
Expand Down
16 changes: 13 additions & 3 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,12 @@ impl Session {
self.last_twcc = now;
let mut twcc = self.twcc_rx_register.build_report(DATAGRAM_MTU - 100)?;

// These SSRC are on medial level, but twcc is on session level,
// These SSRC are on media level, but twcc is on session level,
// we fill in the first discovered media SSRC in each direction.
twcc.sender_ssrc = sender_ssrc;
twcc.ssrc = self.streams.first_ssrc_remote();

debug!("Created feedback TWCC: {:?}", twcc);
trace!("Created feedback TWCC: {:?}", twcc);
self.feedback_tx.push_front(Rtcp::Twcc(twcc));
Some(())
}
Expand Down Expand Up @@ -468,7 +468,7 @@ impl Session {

for fb in RtcpFb::from_rtcp(self.feedback_rx.drain(..)) {
if let RtcpFb::Twcc(twcc) = fb {
debug!("Handle TWCC: {:?}", twcc);
trace!("Handle TWCC: {:?}", twcc);
let range = self.twcc_tx_register.apply_report(twcc, now);

if let Some(bwe) = &mut self.bwe {
Expand Down Expand Up @@ -796,6 +796,12 @@ impl Session {
}
}

pub fn reset_bwe(&mut self, init_bitrate: Bitrate) {
if let Some(bwe) = self.bwe.as_mut() {
bwe.reset(init_bitrate);
}
}

pub fn line_count(&self) -> usize {
self.medias.len() + if self.app.is_some() { 1 } else { 0 }
}
Expand Down Expand Up @@ -898,6 +904,10 @@ impl Bwe {
self.bwe.handle_timeout(now);
}

pub fn reset(&mut self, init_bitrate: Bitrate) {
self.bwe = SendSideBandwithEstimator::new(init_bitrate);
}

pub fn update<'t>(
&mut self,
records: impl Iterator<Item = &'t crate::rtp_::TwccSendRecord>,
Expand Down
9 changes: 6 additions & 3 deletions src/streams/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,12 @@ impl StreamRx {

let xr = self.create_extended_receiver_report(now);

debug!(
trace!(
"Created feedback RR/XR ({:?}/{:?}): {:?} {:?}",
self.mid, self.rid, rr, xr
self.mid,
self.rid,
rr,
xr
);
feedback.push_back(Rtcp::ReceiverReport(rr));
feedback.push_back(Rtcp::ExtendedReport(xr));
Expand Down Expand Up @@ -580,7 +583,7 @@ impl StreamRx {
nack.sender_ssrc = sender_ssrc;
nack.ssrc = self.ssrc;

debug!("Created feedback NACK: {:?}", nack);
trace!("Created feedback NACK: {:?}", nack);
feedback.push_back(Rtcp::Nack(nack));
self.stats.nacks += 1;
}
Expand Down
2 changes: 1 addition & 1 deletion src/streams/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ impl StreamTx {
pub(crate) fn create_sr_and_update(&mut self, now: Instant, feedback: &mut VecDeque<Rtcp>) {
let sr = self.create_sender_report(now);

debug!("Created feedback SR: {:?}", sr);
trace!("Created feedback SR: {:?}", sr);
feedback.push_back(Rtcp::SenderReport(sr));

if let Some(ds) = self.create_sdes() {
Expand Down
8 changes: 0 additions & 8 deletions src/util/time_tricks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ pub trait InstantExt {
/// panics if `time` goes backwards, i.e. we use this for one Instant and then an earlier Instant.
fn to_unix_duration(&self) -> Duration;

/// Convert a unix duration to our relative Instant.
fn from_unix_duration(d: Duration) -> Self;

/// Convert an Instant to a Duration for ntp time.
fn to_ntp_duration(&self) -> Duration;

Expand Down Expand Up @@ -104,11 +101,6 @@ impl InstantExt for Instant {
.expect("clock to go forwards from unix epoch")
}

fn from_unix_duration(d: Duration) -> Self {
let since_beginning_of_time = d.saturating_sub(epoch_to_beginning());
BEGINNING_OF_TIME.0 + since_beginning_of_time
}

fn to_ntp_duration(&self) -> Duration {
self.to_unix_duration() + Duration::from_micros(MICROS_1900)
}
Expand Down
7 changes: 5 additions & 2 deletions tests/user-rtp-header-extension.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::net::Ipv4Addr;
use std::time::Duration;

use str0m::change::SdpOffer;
use str0m::format::Codec;
use str0m::media::{Direction, MediaKind};
use str0m::rtp::Extension;
Expand Down Expand Up @@ -89,8 +90,10 @@ pub fn user_rtp_header_extension() -> Result<(), RtcError> {
let mut change = l.sdp_api();
let mid = change.add_media(MediaKind::Audio, Direction::SendRecv, None, None);
let (offer, pending) = change.apply().unwrap();

let answer = r.rtc.sdp_api().accept_offer(offer)?;
let offer_str = offer.to_sdp_string();
let offer_parsed =
SdpOffer::from_sdp_string(&offer_str).expect("Should parse offer from string");
let answer = r.rtc.sdp_api().accept_offer(offer_parsed)?;
l.rtc.sdp_api().accept_answer(pending, answer)?;

// Verify that the extension is negotiated.
Expand Down

0 comments on commit efd420e

Please sign in to comment.