Skip to content

Commit

Permalink
feat: local track emit event
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed May 21, 2024
1 parent 9f7ac74 commit 40a73bb
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 22 deletions.
1 change: 1 addition & 0 deletions packages/media_core/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ pub enum EndpointRes {
#[derive(Debug, PartialEq, Eq)]
pub enum EndpointLocalTrackEvent {
Media(MediaPacket),
Status(protobuf::shared::receiver::Status),
}

/// This is used for controlling the remote track, which is sent from endpoint
Expand Down
42 changes: 37 additions & 5 deletions packages/media_core/src/endpoint/internal/local_track.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use atm0s_sdn::TimePivot;
use media_server_protocol::{
endpoint::{PeerId, TrackName, TrackPriority},
media::MediaKind,
protobuf::shared::receiver::Status as ProtoStatus,
transport::RpcError,
};
use sans_io_runtime::{return_if_none, Task, TaskSwitcherChild};
Expand All @@ -25,6 +26,8 @@ use super::bitrate_allocator::EgressAction;

mod packet_selector;

const MEDIA_TIMEOUT_MS: u64 = 2_000; //after 2s not receive media, the track will become inactive

pub enum Input {
JoinRoom(ClusterRoomHash),
LeaveRoom,
Expand All @@ -43,10 +46,17 @@ pub enum Output {
Stopped(MediaKind),
}

#[derive(Debug, PartialEq, Eq)]
enum Status {
Waiting,
Active { last_media_ts: u64 },
Inactive,
}

pub struct EndpointLocalTrack {
kind: MediaKind,
room: Option<ClusterRoomHash>,
bind: Option<(PeerId, TrackName)>,
bind: Option<(PeerId, TrackName, Status)>,
queue: VecDeque<Output>,
selector: PacketSelector,
timer: TimePivot,
Expand Down Expand Up @@ -76,7 +86,7 @@ impl EndpointLocalTrack {
assert_ne!(self.room, None);
let room = return_if_none!(self.room.take());

Check warning on line 87 in packages/media_core/src/endpoint/internal/local_track.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/local_track.rs#L87

Added line #L87 was not covered by tests
log::info!("[EndpointLocalTrack] leave room {room}");
let (peer, track) = return_if_none!(self.bind.take());
let (peer, track, _) = return_if_none!(self.bind.take());

Check warning on line 89 in packages/media_core/src/endpoint/internal/local_track.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/local_track.rs#L89

Added line #L89 was not covered by tests
log::info!("[EndpointLocalTrack] leave room {room} => auto Unsubscribe {peer} {track}");
self.queue.push_back(Output::Cluster(room, ClusterLocalTrackControl::Unsubscribe));

Check warning on line 91 in packages/media_core/src/endpoint/internal/local_track.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/local_track.rs#L91

Added line #L91 was not covered by tests
}
Expand All @@ -94,6 +104,19 @@ impl EndpointLocalTrack {
let now_ms = self.timer.timestamp_ms(now);
if self.selector.select(self.timer.timestamp_ms(now), channel, &mut pkt).is_some() {
self.pop_selector(now_ms);

Check warning on line 106 in packages/media_core/src/endpoint/internal/local_track.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/local_track.rs#L105-L106

Added lines #L105 - L106 were not covered by tests

if let Some((_, _, status)) = &mut self.bind {
match status {
Status::Waiting | Status::Inactive => {
*status = Status::Active { last_media_ts: now_ms };
self.queue.push_back(Output::Event(EndpointLocalTrackEvent::Status(ProtoStatus::Active)));
}
Status::Active { last_media_ts } => {
*last_media_ts = now_ms;
}

Check warning on line 116 in packages/media_core/src/endpoint/internal/local_track.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/local_track.rs#L108-L116

Added lines #L108 - L116 were not covered by tests
}
}

Check warning on line 118 in packages/media_core/src/endpoint/internal/local_track.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/local_track.rs#L118

Added line #L118 was not covered by tests

self.queue.push_back(Output::Event(EndpointLocalTrackEvent::Media(pkt)));
}

Check warning on line 121 in packages/media_core/src/endpoint/internal/local_track.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/local_track.rs#L120-L121

Added lines #L120 - L121 were not covered by tests
}
Expand Down Expand Up @@ -126,12 +149,12 @@ impl EndpointLocalTrack {
let track = source.track;
let now_ms = self.timer.timestamp_ms(now);

Check warning on line 150 in packages/media_core/src/endpoint/internal/local_track.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/local_track.rs#L141-L150

Added lines #L141 - L150 were not covered by tests
log::info!("[EndpointLocalTrack] view room {room} peer {peer} track {track}");
if let Some((_peer, _track)) = self.bind.take() {
if let Some((_peer, _track, _status)) = self.bind.take() {

Check warning on line 152 in packages/media_core/src/endpoint/internal/local_track.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/local_track.rs#L152

Added line #L152 was not covered by tests
log::info!("[EndpointLocalTrack] view room {room} peer {peer} track {track} => unsubscribe current {_peer} {_track}");
self.queue.push_back(Output::Cluster(*room, ClusterLocalTrackControl::Unsubscribe));
self.queue.push_back(Output::Stopped(self.kind));
}
self.bind = Some((peer.clone(), track.clone()));
self.bind = Some((peer.clone(), track.clone(), Status::Waiting));
self.selector.set_limit_layer(now_ms, config.max_spatial, config.max_temporal);
self.queue.push_back(Output::Started(self.kind, config.priority));

Check warning on line 159 in packages/media_core/src/endpoint/internal/local_track.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/local_track.rs#L157-L159

Added lines #L157 - L159 were not covered by tests
self.queue.push_back(Output::Cluster(*room, ClusterLocalTrackControl::Subscribe(peer, track)));
Expand All @@ -145,7 +168,7 @@ impl EndpointLocalTrack {
EndpointLocalTrackReq::Detach() => {
//TODO process config here
if let Some(room) = self.room.as_ref() {
if let Some((peer, track)) = self.bind.take() {
if let Some((peer, track, _)) = self.bind.take() {
self.queue.push_back(Output::RpcRes(req_id, EndpointLocalTrackRes::Detach(Ok(()))));

Check warning on line 172 in packages/media_core/src/endpoint/internal/local_track.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/local_track.rs#L171-L172

Added lines #L171 - L172 were not covered by tests
self.queue.push_back(Output::Stopped(self.kind));
self.queue.push_back(Output::Cluster(*room, ClusterLocalTrackControl::Unsubscribe));
Expand Down Expand Up @@ -205,6 +228,15 @@ impl Task<Input, Output> for EndpointLocalTrack {
let now_ms = self.timer.timestamp_ms(now);
self.selector.on_tick(now_ms);
self.pop_selector(now_ms);

if let Some((_, _, status)) = &mut self.bind {
if let Status::Active { last_media_ts } = status {
if now_ms >= *last_media_ts + MEDIA_TIMEOUT_MS {
*status = Status::Inactive;
self.queue.push_back(Output::Event(EndpointLocalTrackEvent::Status(ProtoStatus::Inactive)));
}
}
}

Check warning on line 239 in packages/media_core/src/endpoint/internal/local_track.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/local_track.rs#L232-L239

Added lines #L232 - L239 were not covered by tests
}

fn on_event(&mut self, now: Instant, input: Input) {

Check warning on line 242 in packages/media_core/src/endpoint/internal/local_track.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/local_track.rs#L242

Added line #L242 was not covered by tests
Expand Down
8 changes: 3 additions & 5 deletions packages/protocol/proto/shared.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ enum Kind {

message Receiver {
enum Status {
NO_SOURCE = 0;
WAITING = 1;
LIVE = 2;
KEY_ONLY = 3;
INACTIVE = 4;
WAITING = 0;
ACTIVE = 1;
INACTIVE = 2;
}

message Source {
Expand Down
16 changes: 5 additions & 11 deletions packages/protocol/src/protobuf/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,9 @@ pub mod receiver {
)]
#[repr(i32)]
pub enum Status {
NoSource = 0,
Waiting = 1,
Live = 2,
KeyOnly = 3,
Inactive = 4,
Waiting = 0,
Active = 1,
Inactive = 2,
}
impl Status {
/// String value of the enum field names used in the ProtoBuf definition.
Expand All @@ -67,20 +65,16 @@ pub mod receiver {
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
Status::NoSource => "NO_SOURCE",
Status::Waiting => "WAITING",
Status::Live => "LIVE",
Status::KeyOnly => "KEY_ONLY",
Status::Active => "ACTIVE",
Status::Inactive => "INACTIVE",

Check warning on line 70 in packages/protocol/src/protobuf/shared.rs

View check run for this annotation

Codecov / codecov/patch

packages/protocol/src/protobuf/shared.rs#L66-L70

Added lines #L66 - L70 were not covered by tests
}
}

Check warning on line 72 in packages/protocol/src/protobuf/shared.rs

View check run for this annotation

Codecov / codecov/patch

packages/protocol/src/protobuf/shared.rs#L72

Added line #L72 was not covered by tests
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"NO_SOURCE" => Some(Self::NoSource),
"WAITING" => Some(Self::Waiting),
"LIVE" => Some(Self::Live),
"KEY_ONLY" => Some(Self::KeyOnly),
"ACTIVE" => Some(Self::Active),
"INACTIVE" => Some(Self::Inactive),
_ => None,

Check warning on line 79 in packages/protocol/src/protobuf/shared.rs

View check run for this annotation

Codecov / codecov/patch

packages/protocol/src/protobuf/shared.rs#L74-L79

Added lines #L74 - L79 were not covered by tests
}
Expand Down
11 changes: 10 additions & 1 deletion packages/transport_webrtc/src/transport/webrtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ use media_server_protocol::{
self,
conn::{
server_event::{
receiver::{Event as ProtoReceiverEvent, State as ProtoReceiverState},
room::{Event as ProtoRoomEvent2, PeerJoined, PeerLeaved, TrackStarted, TrackStopped},
sender::{Event as ProtoSenderEvent, State as ProtoSenderState},
Event as ProtoServerEvent, Receiver as ProtoReceiverEventContainer, Room as ProtoRoomEvent, Sender as ProtoSenderEventContainer,
},
ClientEvent,
},
gateway::ConnectRequest,
shared::{receiver::Status as ProtoReceiverStatus, sender::Status as ProtoSenderStatus, Kind},
shared::{sender::Status as ProtoSenderStatus, Kind},
},
transport::{RpcError, RpcResult},
};
Expand Down Expand Up @@ -248,6 +249,14 @@ impl TransportWebrtcInternal for TransportWebrtcSdk {
log::trace!("[TransportWebrtcSdk] send {:?} size {}", pkt.meta, pkt.data.len());
self.queue.push_back(InternalOutput::Str0mSendMedia(mid, pkt))

Check warning on line 250 in packages/transport_webrtc/src/transport/webrtc.rs

View check run for this annotation

Codecov / codecov/patch

packages/transport_webrtc/src/transport/webrtc.rs#L242-L250

Added lines #L242 - L250 were not covered by tests
}
EndpointLocalTrackEvent::Status(status) => {
let track = return_if_none!(self.local_track(track_id)).name().to_string();
log::info!("[TransportWebrtcSdk] track {track} set status {:?}", status);
self.send_event(ProtoServerEvent::Receiver(ProtoReceiverEventContainer {
name: track,
event: Some(ProtoReceiverEvent::State(ProtoReceiverState { status: status as i32 })),
}));

Check warning on line 258 in packages/transport_webrtc/src/transport/webrtc.rs

View check run for this annotation

Codecov / codecov/patch

packages/transport_webrtc/src/transport/webrtc.rs#L252-L258

Added lines #L252 - L258 were not covered by tests
}
},
EndpointEvent::BweConfig { current, desired } => {
let (current, desired) = self.bwe_state.filter_bwe_config(current, desired);
Expand Down
1 change: 1 addition & 0 deletions packages/transport_webrtc/src/transport/whep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ impl TransportWebrtcInternal for TransportWebrtcWhep {
};
self.queue.push_back(InternalOutput::Str0mSendMedia(mid, pkt));

Check warning on line 149 in packages/transport_webrtc/src/transport/whep.rs

View check run for this annotation

Codecov / codecov/patch

packages/transport_webrtc/src/transport/whep.rs#L149

Added line #L149 was not covered by tests
}
EndpointLocalTrackEvent::Status(_) => {}

Check warning on line 151 in packages/transport_webrtc/src/transport/whep.rs

View check run for this annotation

Codecov / codecov/patch

packages/transport_webrtc/src/transport/whep.rs#L151

Added line #L151 was not covered by tests
},
EndpointEvent::RemoteMediaTrack(_track, _event) => {}

Check warning on line 153 in packages/transport_webrtc/src/transport/whep.rs

View check run for this annotation

Codecov / codecov/patch

packages/transport_webrtc/src/transport/whep.rs#L153

Added line #L153 was not covered by tests
EndpointEvent::BweConfig { current, desired } => {
Expand Down

0 comments on commit 40a73bb

Please sign in to comment.