Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix live777 on_track will not be fired. mid RTP Extensions required for Simulcast #196

Merged
merged 2 commits into from
Jul 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 75 additions & 73 deletions liveion/src/forward/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use std::borrow::ToOwned;
use std::sync::Arc;

use chrono::Utc;
use libwish::Client;
use tokio::sync::{broadcast, RwLock};
use tracing::{debug, info};
use webrtc::api::interceptor_registry::register_default_interceptors;
use webrtc::api::media_engine::MediaEngine;
use webrtc::api::media_engine::{MediaEngine, MIME_TYPE_VP8};
use webrtc::api::setting_engine::SettingEngine;
use webrtc::api::APIBuilder;
use webrtc::data::data_channel::DataChannel;
Expand All @@ -16,21 +17,22 @@ use webrtc::ice_transport::ice_server::RTCIceServer;
use webrtc::interceptor::registry::Registry;
use webrtc::peer_connection::configuration::RTCConfiguration;
use webrtc::peer_connection::RTCPeerConnection;
use webrtc::rtp_transceiver::rtp_codec::{RTCRtpHeaderExtensionCapability, RTPCodecType};
use webrtc::rtp_transceiver::rtp_codec::{
RTCRtpCodecCapability, RTCRtpHeaderExtensionCapability, RTPCodecType,
};
use webrtc::rtp_transceiver::rtp_sender::RTCRtpSender;
use webrtc::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection;
use webrtc::rtp_transceiver::RTCRtpTransceiverInit;
use webrtc::rtp_transceiver::{RTCPFeedback, RTCRtpTransceiverInit};
use webrtc::sdp::extmap::{SDES_MID_URI, SDES_RTP_STREAM_ID_URI};
use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;
use webrtc::track::track_remote::TrackRemote;

use libwish::Client;

use crate::forward::get_peer_id;
use crate::forward::message::ForwardInfo;
use crate::forward::rtcp::RtcpMessage;
use crate::metrics;
use crate::result::Result;
use crate::AppError;
use crate::{metrics, new_broadcast_channel};

use super::media::MediaInfo;
use super::message::{CascadeInfo, ForwardEvent, ForwardEventType};
Expand All @@ -42,30 +44,19 @@ const MESSAGE_SIZE: usize = 1024 * 16;

#[derive(Clone)]
struct DataChannelForward {
publish: (
broadcast::Sender<Vec<u8>>,
Arc<broadcast::Receiver<Vec<u8>>>,
),
subscribe: (
broadcast::Sender<Vec<u8>>,
Arc<broadcast::Receiver<Vec<u8>>>,
),
publish: broadcast::Sender<Vec<u8>>,
subscribe: broadcast::Sender<Vec<u8>>,
}

type PublishRtcpChannel = (
broadcast::Sender<(RtcpMessage, u32)>,
broadcast::Receiver<(RtcpMessage, u32)>,
);

pub(crate) struct PeerForwardInternal {
pub(crate) stream: String,
create_time: i64,
publish_leave_time: RwLock<i64>,
subscribe_leave_time: RwLock<i64>,
publish: RwLock<Option<PublishRTCPeerConnection>>,
publish_tracks: Arc<RwLock<Vec<PublishTrackRemote>>>,
publish_tracks_change: (broadcast::Sender<()>, broadcast::Receiver<()>),
publish_rtcp_channel: PublishRtcpChannel,
publish_tracks_change: broadcast::Sender<()>,
publish_rtcp_channel: broadcast::Sender<(RtcpMessage, u32)>,
subscribe_group: RwLock<Vec<SubscribeRTCPeerConnection>>,
data_channel_forward: DataChannelForward,
ice_server: Vec<RTCIceServer>,
Expand All @@ -74,34 +65,22 @@ pub(crate) struct PeerForwardInternal {

impl PeerForwardInternal {
pub(crate) fn new(stream: impl ToString, ice_server: Vec<RTCIceServer>) -> Self {
let publish_tracks_change = broadcast::channel(1024);
let data_channel_forward_publish = broadcast::channel(1024);
let data_channel_forward_subscribe = broadcast::channel(1024);
let data_channel_forward = DataChannelForward {
publish: (
data_channel_forward_publish.0,
Arc::new(data_channel_forward_publish.1),
),
subscribe: (
data_channel_forward_subscribe.0,
Arc::new(data_channel_forward_subscribe.1),
),
};
let (event_sender, mut recv) = broadcast::channel(16);
tokio::spawn(async move { while recv.recv().await.is_ok() {} });
PeerForwardInternal {
stream: stream.to_string(),
create_time: Utc::now().timestamp_millis(),
publish_leave_time: RwLock::new(0),
subscribe_leave_time: RwLock::new(Utc::now().timestamp_millis()),
publish: RwLock::new(None),
publish_tracks: Arc::new(RwLock::new(Vec::new())),
publish_tracks_change,
publish_rtcp_channel: broadcast::channel(48),
publish_tracks_change: new_broadcast_channel!(16),
publish_rtcp_channel: new_broadcast_channel!(48),
subscribe_group: RwLock::new(Vec::new()),
data_channel_forward,
data_channel_forward: DataChannelForward {
publish: new_broadcast_channel!(1024),
subscribe: new_broadcast_channel!(1024),
},
ice_server,
event_sender,
event_sender: new_broadcast_channel!(16),
}
}

Expand Down Expand Up @@ -241,25 +220,6 @@ impl PeerForwardInternal {
};
}
}

pub async fn reset_cascade_info(&self, id: String, cascade_info: CascadeInfo) -> Result<()> {
let publish = self.publish.read().await;
if publish.is_some() && publish.as_ref().unwrap().id == id {
let mut cascade = publish.as_ref().unwrap().cascade.write().unwrap();
*cascade = Some(cascade_info);
return Ok(());
}

let reforward_group = self.subscribe_group.read().await;
for subscribe in reforward_group.iter() {
if id == subscribe.id {
let mut cascade = subscribe.cascade.write().unwrap();
*cascade = Some(cascade_info);
return Ok(());
}
}
Err(AppError::throw("not found re forward subscribe"))
}
}

// publish
Expand All @@ -284,7 +244,7 @@ impl PeerForwardInternal {
let publish_peer = PublishRTCPeerConnection::new(
self.stream.clone(),
peer.clone(),
self.publish_rtcp_channel.0.subscribe(),
self.publish_rtcp_channel.subscribe(),
cascade,
)
.await?;
Expand Down Expand Up @@ -315,7 +275,7 @@ impl PeerForwardInternal {
{
let mut publish_tracks = self.publish_tracks.write().await;
publish_tracks.clear();
let _ = self.publish_tracks_change.0.send(());
let _ = self.publish_tracks_change.send(());
}
{
let mut publish_leave_time = self.publish_leave_time.write().await;
Expand Down Expand Up @@ -421,7 +381,7 @@ impl PeerForwardInternal {
let mut publish_tracks = self.publish_tracks.write().await;
publish_tracks.push(publish_track_remote);
publish_tracks.sort_by(|a, b| a.rid.cmp(&b.rid));
let _ = self.publish_tracks_change.0.send(());
let _ = self.publish_tracks_change.send(());
Ok(())
}

Expand All @@ -430,8 +390,8 @@ impl PeerForwardInternal {
_peer: Arc<RTCPeerConnection>,
dc: Arc<RTCDataChannel>,
) -> Result<()> {
let sender = self.data_channel_forward.subscribe.0.clone();
let receiver = self.data_channel_forward.publish.0.subscribe();
let sender = self.data_channel_forward.subscribe.clone();
let receiver = self.data_channel_forward.publish.subscribe();
Self::data_channel_forward(dc, sender, receiver).await;
Ok(())
}
Expand Down Expand Up @@ -480,8 +440,8 @@ impl PeerForwardInternal {
recv_sender: u8,
) -> Result<Option<Arc<RTCRtpSender>>> {
Ok(if recv_sender > 0 {
Some(
peer.add_transceiver_from_kind(
let sender = peer
.add_transceiver_from_kind(
kind,
Some(RTCRtpTransceiverInit {
direction: RTCRtpTransceiverDirection::Sendonly,
Expand All @@ -490,8 +450,50 @@ impl PeerForwardInternal {
)
.await?
.sender()
.await,
)
.await;
let track = Arc::new(TrackLocalStaticRTP::new(
RTCRtpCodecCapability {
mime_type: MIME_TYPE_VP8.to_owned(),
clock_rate: 90000,
channels: 0,
sdp_fmtp_line: "".to_owned(),
rtcp_feedback: vec![
RTCPFeedback {
typ: "goog-remb".to_owned(),
parameter: "".to_owned(),
},
RTCPFeedback {
typ: "ccm".to_owned(),
parameter: "fir".to_owned(),
},
RTCPFeedback {
typ: "nack".to_owned(),
parameter: "".to_owned(),
},
RTCPFeedback {
typ: "nack".to_owned(),
parameter: "pli".to_owned(),
},
],
},
"webrtc".to_string(),
format!("{}-{}", "webrtc", kind),
));
// ssrc for sdp
let _ = sender.replace_track(Some(track)).await;
info!(
"[{}] new sender , kind : {}, ssrc : {}",
get_peer_id(peer),
kind,
sender
.get_parameters()
.await
.encodings
.first()
.unwrap()
.ssrc
);
Some(sender)
} else {
None
})
Expand All @@ -518,10 +520,10 @@ impl PeerForwardInternal {
cascade.clone(),
self.stream.clone(),
peer.clone(),
self.publish_rtcp_channel.0.clone(),
self.publish_rtcp_channel.clone(),
(
self.publish_tracks.clone(),
self.publish_tracks_change.0.clone(),
self.publish_tracks_change.clone(),
),
(video_sender, audio_sender),
)
Expand Down Expand Up @@ -551,7 +553,7 @@ impl PeerForwardInternal {
if subscribe.id == session {
flag = true;
metrics::SUBSCRIBE.dec();
if let Some(cascade) = subscribe.cascade.read().unwrap().as_ref() {
if let Some(cascade) = subscribe.cascade.clone() {
reforward_flat = true;
metrics::REFORWARD.dec();

Expand Down Expand Up @@ -601,8 +603,8 @@ impl PeerForwardInternal {
_peer: Arc<RTCPeerConnection>,
dc: Arc<RTCDataChannel>,
) -> Result<()> {
let sender = self.data_channel_forward.publish.0.clone();
let receiver = self.data_channel_forward.subscribe.0.subscribe();
let sender = self.data_channel_forward.publish.clone();
let receiver = self.data_channel_forward.subscribe.subscribe();
Self::data_channel_forward(dc, sender, receiver).await;
Ok(())
}
Expand Down
28 changes: 13 additions & 15 deletions liveion/src/forward/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,11 @@ impl PeerForward {
let peer = self
.new_subscription_peer(MediaInfo::try_from(offer.unmarshal()?)?)
.await?;
let _ = self.internal.add_subscribe(peer.clone(), None).await;
let (sdp, session) = (
peer_complete(offer, peer.clone()).await?,
get_peer_id(&peer),
);
let _ = self.internal.add_subscribe(peer.clone(), None).await;
Ok((sdp, session))
}

Expand All @@ -245,15 +245,7 @@ impl PeerForward {
audio_transceiver: (0, 1),
})
.await?;
let mut cascade_info: CascadeInfo = CascadeInfo {
src: None,
dst: Some(dst.clone()),
token: token.clone(),
resource: None,
};
self.internal
.add_subscribe(peer.clone(), Some(cascade_info.clone()))
.await?;

let offer: RTCSessionDescription = peer.create_offer(None).await?;
let mut gather_complete = peer.gathering_complete_promise().await;
peer.set_local_description(offer).await?;
Expand All @@ -268,12 +260,18 @@ impl PeerForward {
);
match client.wish(description.sdp.clone()).await {
Ok((target_sdp, _)) => {
self.internal
.add_subscribe(
peer.clone(),
Some(CascadeInfo {
src: None,
dst: Some(dst.clone()),
token: token.clone(),
resource: client.resource_url,
}),
)
.await?;
let _ = peer.set_remote_description(target_sdp).await;
cascade_info.resource = client.resource_url;
let _ = self
.internal
.reset_cascade_info(get_peer_id(&peer), cascade_info)
.await;
Ok(())
}
Err(err) => {
Expand Down
8 changes: 4 additions & 4 deletions liveion/src/forward/publish.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::{Arc, RwLock, Weak};
use std::sync::{Arc, Weak};

use anyhow::{anyhow, Result};
use chrono::Utc;
Expand All @@ -18,7 +18,7 @@ pub(crate) struct PublishRTCPeerConnection {
pub(crate) peer: Arc<RTCPeerConnection>,
pub(crate) media_info: MediaInfo,
pub(crate) create_time: i64,
pub(crate) cascade: RwLock<Option<CascadeInfo>>,
pub(crate) cascade: Option<CascadeInfo>,
}

impl PublishRTCPeerConnection {
Expand All @@ -42,7 +42,7 @@ impl PublishRTCPeerConnection {
peer,
media_info,
create_time: Utc::now().timestamp_millis(),
cascade: RwLock::new(cascade),
cascade,
})
}

Expand All @@ -51,7 +51,7 @@ impl PublishRTCPeerConnection {
id: self.id.clone(),
create_time: self.create_time,
connect_state: self.peer.connection_state(),
cascade: self.cascade.read().unwrap().clone(),
cascade: self.cascade.clone(),
}
}

Expand Down
Loading