Skip to content

Commit

Permalink
feat: create new session if ice-restart request with not exists conn_id
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed May 21, 2024
1 parent 40a73bb commit 516759b
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 17 deletions.
11 changes: 7 additions & 4 deletions packages/media_core/src/endpoint/internal/local_track.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl EndpointLocalTrack {
fn on_join_room(&mut self, _now: Instant, room: ClusterRoomHash) {

Check warning on line 78 in packages/media_core/src/endpoint/internal/local_track.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/local_track.rs#L78

Added line #L78 was not covered by tests
assert_eq!(self.room, None);
assert_eq!(self.bind, None);
log::info!("[EndpointLocalTrack] join room {room}");
log::info!("[EndpointLocalTrack] track {} join room {room}", self.kind);

Check warning on line 81 in packages/media_core/src/endpoint/internal/local_track.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/local_track.rs#L81

Added line #L81 was not covered by tests
self.room = Some(room);
}

Expand Down Expand Up @@ -148,9 +148,12 @@ impl EndpointLocalTrack {
let peer = source.peer;
let track = source.track;
let now_ms = self.timer.timestamp_ms(now);
log::info!("[EndpointLocalTrack] view room {room} peer {peer} track {track}");
log::info!("[EndpointLocalTrack] track {} view room {room} peer {peer} track {track}", self.kind);
if let Some((_peer, _track, _status)) = self.bind.take() {
log::info!("[EndpointLocalTrack] view room {room} peer {peer} track {track} => unsubscribe current {_peer} {_track}");
log::info!(
"[EndpointLocalTrack] track {} view room {room} peer {peer} track {track} => unsubscribe current {_peer} {_track}",

Check warning on line 154 in packages/media_core/src/endpoint/internal/local_track.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/local_track.rs#L141-L154

Added lines #L141 - L154 were not covered by tests
self.kind
);
self.queue.push_back(Output::Cluster(*room, ClusterLocalTrackControl::Unsubscribe));
self.queue.push_back(Output::Stopped(self.kind));
}
Expand All @@ -160,7 +163,7 @@ impl EndpointLocalTrack {
self.queue.push_back(Output::Cluster(*room, ClusterLocalTrackControl::Subscribe(peer, track)));
self.selector.reset();
} else {
log::warn!("[EndpointLocalTrack] view but not in room");
log::warn!("[EndpointLocalTrack] track {} view but not in room", self.kind);
self.queue
.push_back(Output::RpcRes(req_id, EndpointLocalTrackRes::Attach(Err(RpcError::new2(EndpointErrors::EndpointNotInRoom)))));

Check warning on line 168 in packages/media_core/src/endpoint/internal/local_track.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/local_track.rs#L166-L168

Added lines #L166 - L168 were not covered by tests
}
Expand Down
4 changes: 2 additions & 2 deletions packages/media_runner/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,14 +314,14 @@ impl MediaServerWorker {
Err(e) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Webrtc(webrtc::RpcRes::Connect(Err(e))))),

Check warning on line 314 in packages/media_runner/src/worker.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_runner/src/worker.rs#L301-L314

Added lines #L301 - L314 were not covered by tests
},
webrtc::RpcReq::RemoteIce(conn, ice) => {
log::info!("on rcp request {req_id}, webrtc::RpcReq::RemoteIce");
log::info!("on rpc request {req_id}, webrtc::RpcReq::RemoteIce");
self.media_webrtc.input(&mut self.switcher).on_event(
now,
GroupInput::Ext(conn.into(), transport_webrtc::ExtIn::RemoteIce(req_id, transport_webrtc::Variant::Webrtc, ice.candidates)),
);

Check warning on line 321 in packages/media_runner/src/worker.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_runner/src/worker.rs#L316-L321

Added lines #L316 - L321 were not covered by tests
}
webrtc::RpcReq::RestartIce(conn, ip, token, user_agent, req) => {
log::info!("on rcp request {req_id}, webrtc::RpcReq::RestartIce");
log::info!("on rpc request {req_id}, webrtc::RpcReq::RestartIce");
self.media_webrtc.input(&mut self.switcher).on_event(
now,
GroupInput::Ext(conn.into(), transport_webrtc::ExtIn::RestartIce(req_id, transport_webrtc::Variant::Webrtc, ip, token, user_agent, req)),
Expand Down
1 change: 1 addition & 0 deletions packages/transport_webrtc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ pub enum WebrtcError {
RpcTrackNameNotFound = 0x2003,
RpcTrackNotAttached = 0x2004,
RpcTrackAlreadyAttached = 0x2005,
RpcEndpointNotFound = 0x2006,
}
26 changes: 18 additions & 8 deletions packages/transport_webrtc/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use sans_io_runtime::{
use str0m::{
bwe::Bitrate,
change::{DtlsCert, SdpOffer},
channel::ChannelId,
channel::{ChannelConfig, ChannelId},
format::CodecConfig,
ice::IceCreds,
media::{KeyframeRequestKind, Mid},
Expand Down Expand Up @@ -128,20 +128,30 @@ impl TransportWebrtc {
let ice_ufrag = rtc_config.local_ice_credentials().as_ref().expect("should have ice credentials").ufrag.clone();

let mut rtc = rtc_config.build();
rtc.direct_api().enable_twcc_feedback();
let mut internal: Box<dyn TransportWebrtcInternal> = match variant {
VariantParams::Whip(room, peer) => Box::new(whip::TransportWebrtcWhip::new(room, peer)),
VariantParams::Whep(room, peer) => Box::new(whep::TransportWebrtcWhep::new(room, peer)),
VariantParams::Webrtc(_ip, _token, _user_agent, req) => {
rtc.direct_api().create_data_channel(ChannelConfig {
label: "data".to_string(),
negotiated: Some(1000),
..Default::default()
});
//we need to start sctp as client side for handling restart-ice in new server
//if not, datachannel will not connect successful after reconnect to new server
rtc.direct_api().start_sctp(true);
Box::new(webrtc::TransportWebrtcSdk::new(req))

Check warning on line 143 in packages/transport_webrtc/src/transport.rs

View check run for this annotation

Codecov / codecov/patch

packages/transport_webrtc/src/transport.rs#L131-L143

Added lines #L131 - L143 were not covered by tests
}
};

rtc.direct_api().enable_twcc_feedback();

Check warning on line 147 in packages/transport_webrtc/src/transport.rs

View check run for this annotation

Codecov / codecov/patch

packages/transport_webrtc/src/transport.rs#L147

Added line #L147 was not covered by tests
let mut ports = Small2dMap::default();
for (local_addr, slot) in local_addrs {
ports.insert(local_addr, slot);
rtc.add_local_candidate(Candidate::host(local_addr, Protocol::Udp).expect("Should add local candidate"));
}
let answer = rtc.sdp_api().accept_offer(offer).map_err(|_e| RpcError::new2(WebrtcError::InternalServerError))?;

Check warning on line 153 in packages/transport_webrtc/src/transport.rs

View check run for this annotation

Codecov / codecov/patch

packages/transport_webrtc/src/transport.rs#L153

Added line #L153 was not covered by tests
let mut local_convert = LocalMediaConvert::default();
let mut internal: Box<dyn TransportWebrtcInternal> = match variant {
VariantParams::Whip(room, peer) => Box::new(whip::TransportWebrtcWhip::new(room, peer)),
VariantParams::Whep(room, peer) => Box::new(whep::TransportWebrtcWhep::new(room, peer)),
VariantParams::Webrtc(_ip, _token, _user_agent, req) => Box::new(webrtc::TransportWebrtcSdk::new(req)),
};
internal.on_codec_config(rtc.codec_config());
local_convert.set_config(rtc.codec_config());

Expand Down Expand Up @@ -207,7 +217,7 @@ impl TransportWebrtc {
}

Check warning on line 217 in packages/transport_webrtc/src/transport.rs

View check run for this annotation

Codecov / codecov/patch

packages/transport_webrtc/src/transport.rs#L213-L217

Added lines #L213 - L217 were not covered by tests
}
InternalOutput::Str0mResetBwe(init_bitrate) => {
log::info!("Reset str0m bwe to init_bitrate {init_bitrate} bps");
log::info!("[TransportWebrtc] Reset str0m bwe to init_bitrate {init_bitrate} bps");

Check warning on line 220 in packages/transport_webrtc/src/transport.rs

View check run for this annotation

Codecov / codecov/patch

packages/transport_webrtc/src/transport.rs#L220

Added line #L220 was not covered by tests
self.rtc.bwe().reset(init_bitrate.into());
}
InternalOutput::TransportOutput(out) => {
Expand Down
6 changes: 5 additions & 1 deletion packages/transport_webrtc/src/transport/webrtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,14 +372,18 @@ impl TransportWebrtcInternal for TransportWebrtcSdk {
))));

Check warning on line 372 in packages/transport_webrtc/src/transport/webrtc.rs

View check run for this annotation

Codecov / codecov/patch

packages/transport_webrtc/src/transport/webrtc.rs#L369-L372

Added lines #L369 - L372 were not covered by tests
}
Str0mEvent::StreamPaused(event) => {
// We need to map media ssrc here for avoiding unknown pkt
// without it, sometime we will failed to restore session from restart-ice
self.media_convert.get_mid(event.ssrc, Some(event.mid));

Check warning on line 377 in packages/transport_webrtc/src/transport/webrtc.rs

View check run for this annotation

Codecov / codecov/patch

packages/transport_webrtc/src/transport/webrtc.rs#L374-L377

Added lines #L374 - L377 were not covered by tests

let track = return_if_none!(self.remote_track_by_mid(event.mid)).name().to_string();
let status = if event.paused {
ProtoSenderStatus::Inactive

Check warning on line 381 in packages/transport_webrtc/src/transport/webrtc.rs

View check run for this annotation

Codecov / codecov/patch

packages/transport_webrtc/src/transport/webrtc.rs#L379-L381

Added lines #L379 - L381 were not covered by tests
} else {
ProtoSenderStatus::Active

Check warning on line 383 in packages/transport_webrtc/src/transport/webrtc.rs

View check run for this annotation

Codecov / codecov/patch

packages/transport_webrtc/src/transport/webrtc.rs#L383

Added line #L383 was not covered by tests
};

log::info!("[TransportWebrtcSdk] track {track} set status {:?}", status);
log::info!("[TransportWebrtcSdk] track {track} mid {} ssrc {} set status {:?}", event.mid, event.ssrc, status);
self.send_event(ProtoServerEvent::Sender(ProtoSenderEventContainer {
name: track,
event: Some(ProtoSenderEvent::State(ProtoSenderState { status: status as i32 })),
Expand Down
22 changes: 20 additions & 2 deletions packages/transport_webrtc/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use media_server_core::{
cluster::{ClusterEndpointControl, ClusterEndpointEvent, ClusterRoomHash},
endpoint::{Endpoint, EndpointCfg, EndpointInput, EndpointOutput},
};
use media_server_protocol::transport::RpcResult;
use media_server_protocol::transport::{RpcError, RpcResult};
use sans_io_runtime::{
backend::{BackendIncoming, BackendOutgoing},
group_owner_type, group_task, return_if_none, return_if_some, TaskSwitcher, TaskSwitcherChild,
Expand All @@ -14,6 +14,7 @@ use str0m::change::DtlsCert;
use crate::{
shared_port::SharedUdpPort,
transport::{ExtIn, ExtOut, TransportWebrtc, VariantParams},
WebrtcError,
};

group_task!(Endpoints, Endpoint<TransportWebrtc, ExtIn, ExtOut>, EndpointInput<ExtIn>, EndpointOutput<ExtOut>);
Expand Down Expand Up @@ -118,7 +119,24 @@ impl MediaWorkerWebrtc {
}
GroupInput::Ext(owner, ext) => {
log::info!("[MediaWorkerWebrtc] on ext to owner {:?}", owner);
self.endpoints.on_event(now, owner.index(), EndpointInput::Ext(ext));
if let Some(&Some(_)) = self.endpoints.tasks.get(owner.index()) {
self.endpoints.on_event(now, owner.index(), EndpointInput::Ext(ext));
} else {
match ext {
ExtIn::RemoteIce(req_id, variant, ..) => {
self.queue
.push_back(GroupOutput::Ext(owner, ExtOut::RemoteIce(req_id, variant, Err(RpcError::new2(WebrtcError::RpcEndpointNotFound)))));
}
ExtIn::RestartIce(req_id, variant, remote, useragent, token, req) => {
if let Ok((ice_lite, sdp, index)) = self.spawn(VariantParams::Webrtc(remote, useragent, token, req.clone()), &req.sdp) {
self.queue.push_back(GroupOutput::Ext(index.into(), ExtOut::RestartIce(req_id, variant, Ok((ice_lite, sdp)))));
} else {
self.queue
.push_back(GroupOutput::Ext(owner, ExtOut::RestartIce(req_id, variant, Err(RpcError::new2(WebrtcError::RpcEndpointNotFound)))));
}

Check warning on line 136 in packages/transport_webrtc/src/worker.rs

View check run for this annotation

Codecov / codecov/patch

packages/transport_webrtc/src/worker.rs#L122-L136

Added lines #L122 - L136 were not covered by tests
}
}
}
}
GroupInput::Close(owner) => {
self.endpoints.on_event(now, owner.index(), EndpointInput::Close);

Check warning on line 142 in packages/transport_webrtc/src/worker.rs

View check run for this annotation

Codecov / codecov/patch

packages/transport_webrtc/src/worker.rs#L142

Added line #L142 was not covered by tests
Expand Down

0 comments on commit 516759b

Please sign in to comment.