Skip to content

Commit

Permalink
update UnboundedSender and sequence_number out of bounds
Browse files Browse the repository at this point in the history
  • Loading branch information
hongcha98 committed Aug 17, 2023
1 parent 3b73dfe commit cac3ca9
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 12 deletions.
20 changes: 11 additions & 9 deletions src/forward/forward_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,25 @@ use std::sync::{Arc, Weak};
use std::time::Duration;

use anyhow::Result;
use tokio::sync::mpsc::{channel, Sender};
use tokio::sync::{Mutex, RwLock};
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use uuid::Uuid;
use webrtc::api::media_engine::{MIME_TYPE_OPUS, MIME_TYPE_VP8};
use webrtc::peer_connection::RTCPeerConnection;
use webrtc::rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication;
use webrtc::rtp::packet::Packet;
use webrtc::rtp_transceiver::rtp_codec::RTCRtpCodecCapability;
use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;
use webrtc::track::track_local::{TrackLocal, TrackLocalWriter};
use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;
use webrtc::track::track_remote::TrackRemote;

const VIDEO_KIND: &str = "video";
const AUDIO_KIND: &str = "audio";

type ForwardData = Arc<Packet>;

type SenderForwardData = UnboundedSender<ForwardData>;

struct PeerWrap(Arc<RTCPeerConnection>);

impl Clone for PeerWrap {
Expand Down Expand Up @@ -79,9 +81,9 @@ pub struct PeerForwardInternal {
anchor: Arc<RwLock<Option<Arc<RTCPeerConnection>>>>,
subscribe_group: Arc<RwLock<Vec<PeerWrap>>>,
anchor_track_forward_map:
Arc<RwLock<HashMap<TrackRemoteWrap, Arc<RwLock<HashMap<PeerWrap, Sender<ForwardData>>>>>>>,
Arc<RwLock<HashMap<TrackRemoteWrap, Arc<RwLock<HashMap<PeerWrap, SenderForwardData>>>>>>,
anchor_track_forward_map_retain:
Arc<Mutex<HashMap<String, Arc<RwLock<HashMap<PeerWrap, Sender<ForwardData>>>>>>>,
Arc<Mutex<HashMap<String, Arc<RwLock<HashMap<PeerWrap, SenderForwardData>>>>>>,
}

impl PeerForwardInternal {
Expand Down Expand Up @@ -284,14 +286,14 @@ impl PeerForwardInternal {
break;
}
let anchor_track_forward = anchor_track_forward.unwrap().read().await;
let senders: Vec<Sender<ForwardData>> = anchor_track_forward
let senders: Vec<SenderForwardData> = anchor_track_forward
.iter()
.map(|(_, sender)| sender.clone())
.collect();
drop(anchor_track_forward);
let packet = Arc::new(rtp_packet);
for sender in senders.iter() {
let _ = sender.send(packet.clone()).await;
let _ = sender.send(packet.clone());
}
}
println!("[{}] [anchor] [track-{}] forward down", self.id, track.id());
Expand All @@ -301,7 +303,7 @@ impl PeerForwardInternal {
&self,
peer: Arc<RTCPeerConnection>,
kind: &str,
) -> Result<Sender<ForwardData>> {
) -> Result<SenderForwardData> {
let uuid = Uuid::new_v4().to_string();
let (mime_type, id, stream_id) = match kind {
VIDEO_KIND => (
Expand All @@ -327,7 +329,7 @@ impl PeerForwardInternal {
let sender = peer
.add_track(Arc::clone(&track) as Arc<dyn TrackLocal + Send + Sync>)
.await?;
let (send, mut recv) = channel::<ForwardData>(32);
let (send, mut recv) = unbounded_channel::<ForwardData>();
let self_id = self.id.clone();
tokio::spawn(async move {
println!(
Expand All @@ -342,7 +344,7 @@ impl PeerForwardInternal {
if let Err(err) = track.write_rtp(&packet).await {
println!("video_track.write err: {}", err);
}
sequence_number += 1;
sequence_number = (sequence_number + 1) % 65535;
}
let _ = peer.remove_track(&sender).await;
println!(
Expand Down
6 changes: 3 additions & 3 deletions src/forward/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use std::sync::{Arc, Mutex};

use anyhow::Result;
use webrtc::api::APIBuilder;
use webrtc::api::interceptor_registry::register_default_interceptors;
use webrtc::api::media_engine::{MediaEngine, MIME_TYPE_OPUS, MIME_TYPE_VP8};
use webrtc::api::APIBuilder;
use webrtc::ice_transport::ice_server::RTCIceServer;
use webrtc::interceptor::registry::Registry;
use webrtc::peer_connection::configuration::RTCConfiguration;
use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
use webrtc::peer_connection::RTCPeerConnection;
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
use webrtc::rtp_transceiver::PayloadType;
use webrtc::rtp_transceiver::rtp_codec::{
RTCRtpCodecCapability, RTCRtpCodecParameters, RTPCodecType,
};
use webrtc::rtp_transceiver::PayloadType;

use crate::forward::forward_internal::PeerForwardInternal;

Expand Down

0 comments on commit cac3ca9

Please sign in to comment.