Skip to content

Commit

Permalink
remove refresh_subscribe method
Browse files Browse the repository at this point in the history
  • Loading branch information
hongcha98 committed Aug 27, 2023
1 parent 6e41682 commit 03d281e
Showing 1 changed file with 45 additions and 43 deletions.
88 changes: 45 additions & 43 deletions src/forward/forward_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use anyhow::Result;
use log::info;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio::sync::RwLock;
use webrtc::api::APIBuilder;
use webrtc::api::interceptor_registry::register_default_interceptors;
use webrtc::api::media_engine::MediaEngine;
use webrtc::api::APIBuilder;
use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
use webrtc::ice_transport::ice_server::RTCIceServer;
use webrtc::interceptor::registry::Registry;
Expand All @@ -18,11 +18,13 @@ use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
use webrtc::peer_connection::RTCPeerConnection;
use webrtc::rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication;
use webrtc::rtp::packet::Packet;
use webrtc::rtp_transceiver::RTCRtpTransceiverInit;
use webrtc::rtp_transceiver::rtp_codec::{RTCRtpCodecParameters, RTPCodecType};
use webrtc::rtp_transceiver::rtp_codec::{
RTCRtpCodecCapability, RTCRtpCodecParameters, RTPCodecType,
};
use webrtc::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection;
use webrtc::track::track_local::{TrackLocal, TrackLocalWriter};
use webrtc::rtp_transceiver::RTCRtpTransceiverInit;
use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;
use webrtc::track::track_local::{TrackLocal, TrackLocalWriter};
use webrtc::track::track_remote::TrackRemote;

use super::constant::*;
Expand Down Expand Up @@ -145,20 +147,39 @@ impl PeerForwardInternal {
}

pub async fn add_subscribe(&self, peer: Arc<RTCPeerConnection>) -> Result<()> {
let peer_wrap = PeerWrap(peer.clone());
let anchor_track_codec_map = self.anchor_track_codec_map.read().await;
if anchor_track_codec_map.is_empty() {
return Err(anyhow::anyhow!("anchor not push track"));
}
for (kind, codec) in anchor_track_codec_map.iter() {
let sender = self
.peer_add_track(peer.clone(), kind.as_str(), codec.capability.clone())
.await?;
self.anchor_track_forward_map
.get(kind)
.unwrap()
.write()
.await
.insert(peer_wrap.clone(), sender);
}
let mut subscribe_peers = self.subscribe_group.write().await;
subscribe_peers.push(PeerWrap(peer.clone()));
subscribe_peers.push(peer_wrap);
drop(subscribe_peers);
info!("[{}] [subscribe] [{}] up", self.id, peer.get_stats_id());
let _ = self.refresh_subscribe().await?;
Ok(())
}

pub async fn remove_subscribe(&self, peer: Arc<RTCPeerConnection>) -> Result<()> {
let peer_wrap = PeerWrap(peer.clone());
for (_, track_forward_map) in self.anchor_track_forward_map.iter() {
let mut track_forward_map = track_forward_map.write().await;
track_forward_map.remove(&peer_wrap);
}
let mut subscribe_peers = self.subscribe_group.write().await;
subscribe_peers.retain(|x| x != &PeerWrap(peer.clone()));
subscribe_peers.retain(|x| x != &peer_wrap);
drop(subscribe_peers);
info!("[{}] [subscribe] [{}] down", self.id, peer.get_stats_id());
let _ = self.refresh_subscribe().await?;
Ok(())
}

Expand All @@ -184,25 +205,6 @@ impl PeerForwardInternal {
Ok(())
}

pub(crate) async fn refresh_subscribe(&self) -> Result<()> {
let subscribe_group = self.subscribe_group.read().await;
for (kind, track_forward_map) in self.anchor_track_forward_map.iter() {
let mut track_forward_map = track_forward_map.write().await;
track_forward_map.retain(|k, _| subscribe_group.contains(k));
for peer_wrap in subscribe_group.iter() {
if !track_forward_map.contains_key(peer_wrap) {
if let Ok(sender) = self
.peer_add_track(peer_wrap.0.clone(), kind.as_str())
.await
{
track_forward_map.insert(peer_wrap.clone(), sender);
}
}
}
}
Ok(())
}

pub(crate) async fn anchor_track_forward(&self, track: Arc<TrackRemote>) {
let mut b = vec![0u8; 1500];
let track_key = self.get_anchor_track_key(track.clone());
Expand Down Expand Up @@ -248,19 +250,25 @@ impl PeerForwardInternal {
}),
)
.await?;
let audio_transceiver = peer.add_transceiver_from_kind(
RTPCodecType::Audio,
Some(RTCRtpTransceiverInit {
direction: RTCRtpTransceiverDirection::Recvonly,
send_encodings: Vec::new(),
}),
).await?;
let audio_transceiver = peer
.add_transceiver_from_kind(
RTPCodecType::Audio,
Some(RTCRtpTransceiverInit {
direction: RTCRtpTransceiverDirection::Recvonly,
send_encodings: Vec::new(),
}),
)
.await?;
let track_codec_map = self.anchor_track_codec_map.read().await;
if let Some(codec) = track_codec_map.get(VIDEO_KIND) {
video_transceiver.set_codec_preferences(vec![codec.clone()]).await?;
video_transceiver
.set_codec_preferences(vec![codec.clone()])
.await?;
}
if let Some(codec) = track_codec_map.get(AUDIO_KIND) {
audio_transceiver.set_codec_preferences(vec![codec.clone()]).await?;
audio_transceiver
.set_codec_preferences(vec![codec.clone()])
.await?;
}
}
Ok(peer)
Expand All @@ -269,14 +277,8 @@ impl PeerForwardInternal {
&self,
peer: Arc<RTCPeerConnection>,
kind: &str,
codec: RTCRtpCodecCapability,
) -> Result<SenderForwardData> {
let anchor_track_codec_map = self.anchor_track_codec_map.read().await;
let codec = anchor_track_codec_map.get(kind);
if codec.is_none() {
return Err(anyhow::anyhow!("kind codec not found"));
}
let codec = codec.unwrap().clone().capability;
drop(anchor_track_codec_map);
let track = Arc::new(TrackLocalStaticRTP::new(
codec,
kind.to_owned(),
Expand Down

0 comments on commit 03d281e

Please sign in to comment.