diff --git a/Cargo.toml b/Cargo.toml index e5d371bd..20e8bf70 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,4 +1,5 @@ [workspace] +resolver = "2" members = [ "bin", "packages/protocol", @@ -12,7 +13,7 @@ members = [ ] [workspace.dependencies] -sans-io-runtime = { git = "https://github.com/giangndm/sans-io-runtime.git", rev = "c781cef12b2a435b5e31a6ede69d301a23719452" } +sans-io-runtime = { git = "https://github.com/giangndm/sans-io-runtime.git", rev = "c781cef12b2a435b5e31a6ede69d301a23719452" , default-features = false} atm0s-sdn = { git = "https://github.com/giangndm/8xFF-decentralized-sdn.git", rev = "2ad5d3a092b63f871a90a9600d2fcc8cb3027a24" } tracing-subscriber = { version = "0.3", features = ["env-filter", "std"] } convert-enum = "0.1" diff --git a/bin/src/http/api_console.rs b/bin/src/http/api_console.rs index 757d7108..1c644d07 100644 --- a/bin/src/http/api_console.rs +++ b/bin/src/http/api_console.rs @@ -28,5 +28,5 @@ struct ConsoleAuthorization(()); async fn api_checker(req: &Request, api_key: ApiKey) -> Option<()> { let data = req.data::()?; - data.secure.validate_token(&api_key.key).then(|| ()) + data.secure.validate_token(&api_key.key).then_some(()) } diff --git a/bin/src/http/api_console/connector.rs b/bin/src/http/api_console/connector.rs index 650af84b..e6b671f8 100644 --- a/bin/src/http/api_console/connector.rs +++ b/bin/src/http/api_console/connector.rs @@ -181,6 +181,7 @@ impl Apis { } /// get events + #[allow(clippy::too_many_arguments)] #[oai(path = "/:node/log/events", method = "get")] async fn events( &self, diff --git a/bin/src/quinn/vnet.rs b/bin/src/quinn/vnet.rs index 86179b2d..941510e7 100644 --- a/bin/src/quinn/vnet.rs +++ b/bin/src/quinn/vnet.rs @@ -11,7 +11,7 @@ use super::vsocket::VirtualUdpSocket; #[derive(Debug)] pub struct NetworkPkt { - pub local_port: u16, + pub _local_port: u16, pub remote: NodeId, pub remote_port: u16, pub data: Buffer, @@ -77,7 +77,7 @@ impl VirtualNetwork { let event = event?; match event { socket::Event::RecvFrom(local_port, remote, remote_port, data, meta) => { - let pkt = NetworkPkt { data, local_port, remote, remote_port, meta }; + let pkt = NetworkPkt { data, _local_port: local_port, remote, remote_port, meta }; if let Some(socket_tx) = self.sockets.get(&local_port) { if let Err(e) = socket_tx.try_send(pkt) { log::error!("Send to socket {} error {:?}", local_port, e); diff --git a/bin/src/server/connector.rs b/bin/src/server/connector.rs index 377f224b..f046faeb 100644 --- a/bin/src/server/connector.rs +++ b/bin/src/server/connector.rs @@ -49,7 +49,7 @@ pub struct Args { pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) { rustls::crypto::ring::default_provider().install_default().expect("should install ring as default"); - let mut connector_storage = Arc::new(ConnectorStorage::new(&args.db_uri).await); + let connector_storage = Arc::new(ConnectorStorage::new(&args.db_uri).await); let default_cluster_cert_buf = include_bytes!("../../certs/cluster.cert"); let default_cluster_key_buf = include_bytes!("../../certs/cluster.key"); diff --git a/bin/src/server/console/storage.rs b/bin/src/server/console/storage.rs index c6af5998..2c3df85b 100644 --- a/bin/src/server/console/storage.rs +++ b/bin/src/server/console/storage.rs @@ -151,7 +151,7 @@ impl Storage { ClusterNodeInfo::Console(generic) => { let zone_id = node & 0xFF_FF_FF_00; log::info!("Zone {zone_id} on console ping, zones {}", self.zones.len()); - let zone = self.zones.entry(zone_id).or_insert_with(Default::default); + let zone = self.zones.entry(zone_id).or_default(); zone.consoles.insert( node, ConsoleContainer { @@ -165,7 +165,7 @@ impl Storage { ClusterNodeInfo::Gateway(generic, info) => { let zone_id = node & 0xFF_FF_FF_00; log::info!("Zone {zone_id} on gateway ping"); - let zone = self.zones.entry(zone_id).or_insert_with(Default::default); + let zone = self.zones.entry(zone_id).or_default(); zone.lat = info.lat; zone.lon = info.lon; zone.gateways.insert( @@ -181,7 +181,7 @@ impl Storage { ClusterNodeInfo::Media(generic, info) => { let zone_id = node & 0xFF_FF_FF_00; log::info!("Zone {zone_id} on media ping"); - let zone = self.zones.entry(zone_id).or_insert_with(Default::default); + let zone = self.zones.entry(zone_id).or_default(); zone.medias.insert( node, MediaContainer { @@ -195,7 +195,7 @@ impl Storage { ClusterNodeInfo::Connector(generic) => { let zone_id = node & 0xFF_FF_FF_00; log::info!("Zone {zone_id} on connector ping, zones {}", self.zones.len()); - let zone = self.zones.entry(zone_id).or_insert_with(Default::default); + let zone = self.zones.entry(zone_id).or_default(); zone.connectors.insert( node, ConnectorContainer { diff --git a/bin/src/server/gateway.rs b/bin/src/server/gateway.rs index 70ebd5d2..2956d4e9 100644 --- a/bin/src/server/gateway.rs +++ b/bin/src/server/gateway.rs @@ -221,7 +221,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod media_server_gateway::store_service::Event::FindNodeRes(req_id, res) => requester.on_find_node_res(req_id, res), }, SdnExtOut::ServicesEvent(_, _, SE::Connector(event)) => match event { - media_server_connector::agent_service::Event::Stats { queue, inflight, acked } => {} + media_server_connector::agent_service::Event::Stats { queue: _, inflight: _, acked: _ } => {} }, SdnExtOut::FeaturesEvent(_, FeaturesEvent::Socket(event)) => { if let Err(e) = vnet_tx.try_send(event) { diff --git a/bin/src/server/media.rs b/bin/src/server/media.rs index c682758e..af21a20e 100644 --- a/bin/src/server/media.rs +++ b/bin/src/server/media.rs @@ -145,7 +145,7 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node if let Some(metrics) = node_metrics_collector.pop_measure() { controller.send_to( 0, //because sdn controller allway is run inside worker 0 - ExtIn::NodeStats(metrics).into(), + ExtIn::NodeStats(metrics), ); } while let Ok(control) = vnet_rx.try_recv() { diff --git a/bin/src/server/media/runtime_worker.rs b/bin/src/server/media/runtime_worker.rs index 21797b84..599fdc2f 100644 --- a/bin/src/server/media/runtime_worker.rs +++ b/bin/src/server/media/runtime_worker.rs @@ -6,7 +6,6 @@ use media_server_gateway::NodeMetrics; use media_server_protocol::transport::{RpcReq, RpcRes}; use media_server_runner::{Input as WorkerInput, MediaConfig, MediaServerWorker, Output as WorkerOutput, Owner, UserData, SC, SE, TC, TW}; use media_server_secure::MediaEdgeSecure; -use rand::random; use sans_io_runtime::{BusChannelControl, BusControl, BusEvent, WorkerInner, WorkerInnerInput, WorkerInnerOutput}; use crate::NodeConfig; @@ -58,7 +57,7 @@ impl WorkerInner AudioMixer { for (i, slot) in self.outputs.iter().enumerate() { if let Some(OutputSlotState { audio_level, source }) = slot { if let Some((_, _, lowest_slot_audio_level)) = &mut lowest { - if *audio_level < *lowest_slot_audio_level || (*audio_level == *lowest_slot_audio_level) { + // TODO: We need to process some case we have same audio_level. Just check with smaller only: + // https://github.com/8xFF/atm0s-media-server/pull/328#discussion_r1667336073 + if *audio_level <= *lowest_slot_audio_level { lowest = Some((i, source.clone(), *audio_level)); } } else { diff --git a/packages/media_connector/src/agent_service.rs b/packages/media_connector/src/agent_service.rs index 192fedd5..427a567b 100644 --- a/packages/media_connector/src/agent_service.rs +++ b/packages/media_connector/src/agent_service.rs @@ -33,6 +33,12 @@ pub struct ConnectorAgentService { _tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>, } +impl Default for ConnectorAgentService { + fn default() -> Self { + Self::new() + } +} + impl ConnectorAgentService { pub fn new() -> Self { Self { @@ -159,6 +165,12 @@ pub struct ConnectorAgentServiceBuilder { _tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>, } +impl Default for ConnectorAgentServiceBuilder { + fn default() -> Self { + Self::new() + } +} + impl ConnectorAgentServiceBuilder { pub fn new() -> Self { Self { _tmp: std::marker::PhantomData } diff --git a/packages/media_connector/src/handler_service.rs b/packages/media_connector/src/handler_service.rs index 5f5c98f4..331796c7 100644 --- a/packages/media_connector/src/handler_service.rs +++ b/packages/media_connector/src/handler_service.rs @@ -37,6 +37,12 @@ pub struct ConnectorHandlerService { _tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>, } +impl Default for ConnectorHandlerService { + fn default() -> Self { + Self::new() + } +} + impl ConnectorHandlerService { pub fn new() -> Self { Self { @@ -152,6 +158,12 @@ pub struct ConnectorHandlerServiceBuilder { _tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>, } +impl Default for ConnectorHandlerServiceBuilder { + fn default() -> Self { + Self::new() + } +} + impl ConnectorHandlerServiceBuilder { pub fn new() -> Self { Self { _tmp: std::marker::PhantomData } diff --git a/packages/media_connector/src/msg_queue.rs b/packages/media_connector/src/msg_queue.rs index 8acad050..e81a414c 100644 --- a/packages/media_connector/src/msg_queue.rs +++ b/packages/media_connector/src/msg_queue.rs @@ -30,7 +30,7 @@ impl MessageQueue { pub fn pop(&mut self, now_ms: u64) -> Option<&M> { if let Some(msg_id) = self.pop_retry_msg_id(now_ms) { - let entry = self.inflight_ts.entry(now_ms).or_insert_with(Default::default); + let entry = self.inflight_ts.entry(now_ms).or_default(); entry.push(msg_id); return Some(self.inflight.get(&msg_id).expect("should exist retry_msg_id")); } @@ -38,7 +38,7 @@ impl MessageQueue { if self.inflight.len() < MAX_INFLIGHT { let msg = self.queue.pop_front()?; let msg_id = msg.msg_id(); - let entry = self.inflight_ts.entry(now_ms).or_insert_with(Default::default); + let entry = self.inflight_ts.entry(now_ms).or_default(); entry.push(msg_id); self.inflight.insert(msg_id, msg); self.inflight.get(&msg_id) diff --git a/packages/media_connector/src/sql_storage/migration/m20240626_0001_init.rs b/packages/media_connector/src/sql_storage/migration/m20240626_0001_init.rs index 56dd0daf..32cbb626 100644 --- a/packages/media_connector/src/sql_storage/migration/m20240626_0001_init.rs +++ b/packages/media_connector/src/sql_storage/migration/m20240626_0001_init.rs @@ -121,6 +121,7 @@ impl MigrationTrait for Migration { enum Room { Table, Id, + #[allow(clippy::enum_variant_names)] Room, CreatedAt, } @@ -130,6 +131,7 @@ enum Peer { Table, Id, Room, + #[allow(clippy::enum_variant_names)] Peer, CreatedAt, } @@ -165,6 +167,7 @@ enum Event { NodeTs, Session, CreatedAt, + #[allow(clippy::enum_variant_names)] Event, Meta, } diff --git a/packages/media_core/src/cluster/room/audio_mixer.rs b/packages/media_core/src/cluster/room/audio_mixer.rs index 4eb2b1c0..a117dd2e 100644 --- a/packages/media_core/src/cluster/room/audio_mixer.rs +++ b/packages/media_core/src/cluster/room/audio_mixer.rs @@ -53,6 +53,8 @@ pub enum Output { OnResourceEmpty, } +type AudioMixerManuals = TaskSwitcherBranch, ManualMixer, 4>, (usize, Output)>; + pub struct AudioMixer { room: ClusterRoomHash, mix_channel_id: ChannelId, @@ -64,7 +66,7 @@ pub struct AudioMixer { subscriber1: TaskSwitcherBranch, Output>, subscriber2: TaskSwitcherBranch, Output>, subscriber3: TaskSwitcherBranch, Output>, - manuals: TaskSwitcherBranch, ManualMixer, 4>, (usize, Output)>, + manuals: AudioMixerManuals, switcher: TaskSwitcher, last_tick: Instant, } diff --git a/packages/media_core/src/cluster/room/audio_mixer/manual.rs b/packages/media_core/src/cluster/room/audio_mixer/manual.rs index feeeba19..682c8fad 100644 --- a/packages/media_core/src/cluster/room/audio_mixer/manual.rs +++ b/packages/media_core/src/cluster/room/audio_mixer/manual.rs @@ -52,9 +52,9 @@ impl ManualMixer { fn attach(&mut self, _now: Instant, source: TrackSource) { let channel_id = id_generator::gen_channel_id(self.room, &source.peer, &source.track); - if !self.sources.contains_key(&channel_id) { + if let std::collections::hash_map::Entry::Vacant(e) = self.sources.entry(channel_id) { log::info!("[ClusterManualMixer] add source {:?} => sub {channel_id}", source); - self.sources.insert(channel_id, source); + e.insert(source); self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, pubsub::ChannelControl::SubAuto))); } } @@ -85,7 +85,7 @@ impl ManualMixer { fn detach(&mut self, _now: Instant, source: TrackSource) { let channel_id = id_generator::gen_channel_id(self.room, &source.peer, &source.track); - if let Some(_) = self.sources.remove(&channel_id) { + if self.sources.remove(&channel_id).is_some() { log::info!("[ClusterManualMixer] remove source {:?} => unsub {channel_id}", source); self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, pubsub::ChannelControl::UnsubAuto))); } @@ -124,7 +124,7 @@ impl Task> for ManualMixer { Input::LeaveRoom => { // We need manual release sources because it is from client request, // we cannot ensure client will release it before it disconnect. - let sources = std::mem::replace(&mut self.sources, Default::default()); + let sources = std::mem::take(&mut self.sources); for (channel_id, source) in sources { log::info!("[ClusterManualMixer] remove source {:?} on queue => unsub {channel_id}", source); self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, pubsub::ChannelControl::UnsubAuto))); diff --git a/packages/media_core/src/endpoint/internal.rs b/packages/media_core/src/endpoint/internal.rs index 8ee76529..647a4dd4 100644 --- a/packages/media_core/src/endpoint/internal.rs +++ b/packages/media_core/src/endpoint/internal.rs @@ -41,10 +41,12 @@ pub enum InternalOutput { Destroy, } +type EndpointInternalWaitJoin = Option<(EndpointReqId, RoomId, PeerId, PeerMeta, RoomInfoPublish, RoomInfoSubscribe, Option)>; + pub struct EndpointInternal { cfg: EndpointCfg, state: Option<(Instant, TransportState)>, - wait_join: Option<(EndpointReqId, RoomId, PeerId, PeerMeta, RoomInfoPublish, RoomInfoSubscribe, Option)>, + wait_join: EndpointInternalWaitJoin, joined: Option<(ClusterRoomHash, RoomId, PeerId, Option)>, local_tracks_id: Small2dMap, remote_tracks_id: Small2dMap, diff --git a/packages/media_core/src/endpoint/internal/local_track/packet_selector/video_h264_sim.rs b/packages/media_core/src/endpoint/internal/local_track/packet_selector/video_h264_sim.rs index f053d3be..9411035b 100644 --- a/packages/media_core/src/endpoint/internal/local_track/packet_selector/video_h264_sim.rs +++ b/packages/media_core/src/endpoint/internal/local_track/packet_selector/video_h264_sim.rs @@ -1,4 +1,4 @@ -use std::collections::VecDeque; +use std::{cmp::Ordering, collections::VecDeque}; use media_server_protocol::media::{MediaLayersBitrate, MediaMeta, MediaPacket}; @@ -53,36 +53,42 @@ impl Selector { if let MediaMeta::H264 { key, profile: _, sim: Some(_sim) } = &mut pkt.meta { match (self.current, self.target) { (Some(current), Some(target)) => { - if target < current { - //down spatial => need wait key-frame - if *key { - log::info!("[H264SimSelector] down {} => {} with key", current, target); - ctx.seq_rewrite.reinit(); - ctx.ts_rewrite.reinit(); - self.current = self.target; - } else { - self.queue.push_back(Action::RequestKeyFrame); + match target.cmp(¤t) { + Ordering::Less => { + // down spatial => need wait key-frame + if *key { + log::info!("[H264SimSelector] down {} => {} with key", current, target); + ctx.seq_rewrite.reinit(); + ctx.ts_rewrite.reinit(); + self.current = self.target; + } else { + self.queue.push_back(Action::RequestKeyFrame); + } } - } else if target > current { - //up spatial => need wait key-frame - if *key { - log::info!("[H264SimSelector] up {} => {} with key", current, target); - ctx.seq_rewrite.reinit(); - ctx.ts_rewrite.reinit(); - self.current = Some(target); - } else if !*key { - self.queue.push_back(Action::RequestKeyFrame); + Ordering::Greater => { + // up spatial => need wait key-frame + if *key { + log::info!("[H264SimSelector] up {} => {} with key", current, target); + ctx.seq_rewrite.reinit(); + ctx.ts_rewrite.reinit(); + self.current = Some(target); + } else { + self.queue.push_back(Action::RequestKeyFrame); + } + } + Ordering::Equal => { + // target is equal to current, handle if needed } } } (Some(current), None) => { - //need pause - //TODO wait current frame finished for avoiding interrupt client + // need pause + // TODO: wait current frame finished for avoiding interrupt client log::info!("[H264SimSelector] pause from {}", current); self.current = None; } (None, Some(target)) => { - //need resume or start => need wait key_frame + // need resume or start => need wait key-frame if *key { log::info!("[H264SimSelector] resume to {} with key", target); // with other spatial we have difference tl0xidx and pic_id offset @@ -91,7 +97,7 @@ impl Selector { } } (None, None) => { - //reject + // reject } } } diff --git a/packages/media_core/src/endpoint/internal/local_track/packet_selector/video_vp8_sim.rs b/packages/media_core/src/endpoint/internal/local_track/packet_selector/video_vp8_sim.rs index 4773ae6c..400ba4c0 100644 --- a/packages/media_core/src/endpoint/internal/local_track/packet_selector/video_vp8_sim.rs +++ b/packages/media_core/src/endpoint/internal/local_track/packet_selector/video_vp8_sim.rs @@ -8,7 +8,7 @@ //! //! Note that, in simulcast stream, each spatial layer is independent stream and have independent seq, ts -use std::collections::VecDeque; +use std::{cmp::Ordering, collections::VecDeque}; use media_server_protocol::media::{MediaLayerSelection, MediaLayersBitrate, MediaMeta, MediaPacket}; use media_server_utils::SeqRewrite; @@ -73,71 +73,78 @@ impl Selector { if let MediaMeta::Vp8 { key, sim: Some(sim) } = &mut pkt.meta { match (&mut self.current, &self.target) { (Some(current), Some(target)) => { - //need switch to temporal layer only - if target.spatial == current.spatial { - //change temporal - if target.temporal > current.temporal { - //up temporal => need wait layer_sync - if sim.spatial == current.spatial && sim.temporal <= target.temporal && sim.layer_sync { - log::info!("[Vp8SimSelector] up temporal {},{} => {},{}", current.spatial, current.temporal, target.spatial, target.temporal); - current.temporal = target.temporal; + match target.spatial.cmp(¤t.spatial) { + Ordering::Equal => { + // change temporal + match target.temporal.cmp(¤t.temporal) { + Ordering::Greater => { + // up temporal => need wait layer_sync + if sim.spatial == current.spatial && sim.temporal <= target.temporal && sim.layer_sync { + log::info!("[Vp8SimSelector] up temporal {},{} => {},{}", current.spatial, current.temporal, target.spatial, target.temporal); + current.temporal = target.temporal; + } + } + Ordering::Less => { + // down temporal => do now + log::info!("[Vp8SimSelector] down temporal {},{} => {},{}", current.spatial, current.temporal, target.spatial, target.temporal); + current.temporal = target.temporal; + } + Ordering::Equal => {} } - } else if target.temporal < current.temporal { - //down temporal => do now - log::info!("[Vp8SimSelector] down temporal {},{} => {},{}", current.spatial, current.temporal, target.spatial, target.temporal); - current.temporal = target.temporal; - } - } else if target.spatial < current.spatial { - //down spatial => need wait key-frame - //first we allway down temporal for trying reduce bandwidth - if current.temporal != 0 { - log::info!("[Vp8SimSelector] down spatial then down temporal from {} => 0", current.temporal); - current.temporal = 0; - } - if *key { - log::info!("[Vp8SimSelector] down {},{} => {},{} with key", current.spatial, current.temporal, target.spatial, target.temporal); - // with other spatial we have difference tl0xidx and pic_id offset - // therefore we need reinit both tl0idx and pic_id - ctx.vp8_ctx.tl0idx_rewrite.reinit(); - ctx.vp8_ctx.pic_id_rewrite.reinit(); - ctx.seq_rewrite.reinit(); - ctx.ts_rewrite.reinit(); - current.spatial = target.spatial; - current.temporal = target.temporal; - } else { - self.queue.push_back(Action::RequestKeyFrame); } - } else if target.spatial > current.spatial { - //up spatial => need wait key-frame - //first we try to up temporal for trying increase bandwidth - if sim.spatial == current.spatial && current.temporal != 2 && sim.layer_sync { - log::info!("[Vp8SimSelector] up spatial then up temporal from {} => 2 before key arrived", current.temporal); - current.temporal = 2; + Ordering::Less => { + // down spatial => need wait key-frame + // first we always down temporal for trying to reduce bandwidth + if current.temporal != 0 { + log::info!("[Vp8SimSelector] down spatial then down temporal from {} => 0", current.temporal); + current.temporal = 0; + } + if *key { + log::info!("[Vp8SimSelector] down {},{} => {},{} with key", current.spatial, current.temporal, target.spatial, target.temporal); + // with other spatial we have difference tl0xidx and pic_id offset + // therefore we need reinit both tl0idx and pic_id + ctx.vp8_ctx.tl0idx_rewrite.reinit(); + ctx.vp8_ctx.pic_id_rewrite.reinit(); + ctx.seq_rewrite.reinit(); + ctx.ts_rewrite.reinit(); + current.spatial = target.spatial; + current.temporal = target.temporal; + } else { + self.queue.push_back(Action::RequestKeyFrame); + } } + Ordering::Greater => { + // up spatial => need wait key-frame + // first we try to up temporal for trying to increase bandwidth + if sim.spatial == current.spatial && current.temporal != 2 && sim.layer_sync { + log::info!("[Vp8SimSelector] up spatial then up temporal from {} => 2 before key arrived", current.temporal); + current.temporal = 2; + } - if *key { - log::info!("[Vp8SimSelector] up {},{} => {},{} with key-frame", current.spatial, current.temporal, target.spatial, target.temporal); - // with other spatial we have difference tl0xidx and pic_id offset - // therefore we need reinit both tl0idx and pic_id - ctx.vp8_ctx.tl0idx_rewrite.reinit(); - ctx.vp8_ctx.pic_id_rewrite.reinit(); - ctx.seq_rewrite.reinit(); - ctx.ts_rewrite.reinit(); - current.spatial = target.spatial; - current.temporal = target.temporal; - } else { - self.queue.push_back(Action::RequestKeyFrame); + if *key { + log::info!("[Vp8SimSelector] up {},{} => {},{} with key-frame", current.spatial, current.temporal, target.spatial, target.temporal); + // with other spatial we have difference tl0xidx and pic_id offset + // therefore we need reinit both tl0idx and pic_id + ctx.vp8_ctx.tl0idx_rewrite.reinit(); + ctx.vp8_ctx.pic_id_rewrite.reinit(); + ctx.seq_rewrite.reinit(); + ctx.ts_rewrite.reinit(); + current.spatial = target.spatial; + current.temporal = target.temporal; + } else { + self.queue.push_back(Action::RequestKeyFrame); + } } } } (Some(current), None) => { - //need pause - //TODO wait current frame finished for avoiding interrupt client + // need pause + // TODO: wait current frame finished for avoiding interrupt client log::info!("[Vp8SimSelector] pause from {},{}", current.spatial, current.temporal); self.current = None; } (None, Some(target)) => { - //need resume or start => need wait key_frame + // need resume or start => need wait key_frame if *key { log::info!("[Vp8SimSelector] resume to {},{} with key", target.spatial, target.temporal); // with other spatial we have difference tl0xidx and pic_id offset @@ -148,7 +155,7 @@ impl Selector { } } (None, None) => { - //reject + // reject } } } diff --git a/packages/media_core/src/endpoint/internal/local_track/packet_selector/video_vp9_svc.rs b/packages/media_core/src/endpoint/internal/local_track/packet_selector/video_vp9_svc.rs index a17b0d15..5f371250 100644 --- a/packages/media_core/src/endpoint/internal/local_track/packet_selector/video_vp9_svc.rs +++ b/packages/media_core/src/endpoint/internal/local_track/packet_selector/video_vp9_svc.rs @@ -13,7 +13,7 @@ //! - K-SVC: we need key-frame for up and down layer //! - Full-SVCL we only need key-frame for up, and only end-frame flag for down layer -use std::collections::VecDeque; +use std::{cmp::Ordering, collections::VecDeque}; use media_server_protocol::media::{MediaLayerSelection, MediaLayersBitrate, MediaMeta, MediaPacket}; use media_server_utils::SeqRewrite; @@ -86,62 +86,69 @@ impl Selector { if let MediaMeta::Vp9 { key, profile: _, svc: Some(svc) } = &mut pkt.meta { match (&mut self.current, &self.target) { (Some(current), Some(target)) => { - //need switch to temporal layer only - if target.spatial == current.spatial { - //change temporal - if target.temporal > current.temporal { - //up temporal => need wait switching_point and pre frame is end - if svc.spatial == current.spatial && svc.temporal > current.temporal && svc.switching_point && self.pre_end_frame { - log::info!("[Vp9SvcSelector] up temporal {},{} => {},{}", current.spatial, current.temporal, target.spatial, target.temporal); - current.temporal = target.temporal; + match target.spatial.cmp(¤t.spatial) { + Ordering::Equal => { + // change temporal + match target.temporal.cmp(¤t.temporal) { + Ordering::Greater => { + // up temporal => need wait switching_point and pre frame is end + if svc.spatial == current.spatial && svc.temporal > current.temporal && svc.switching_point && self.pre_end_frame { + log::info!("[Vp9SvcSelector] up temporal {},{} => {},{}", current.spatial, current.temporal, target.spatial, target.temporal); + current.temporal = target.temporal; + } + } + Ordering::Less => { + // down temporal => need wait end_frame + if self.pre_end_frame { + log::info!("[Vp9SvcSelector] down temporal {},{} => {},{}", current.spatial, current.temporal, target.spatial, target.temporal); + current.temporal = target.temporal; + } + } + Ordering::Equal => {} + } + } + Ordering::Less => { + // down spatial => need wait key-frame + // first we always down temporal for trying to reduce bandwidth + if current.temporal != 0 && self.pre_end_frame { + log::info!("[Vp9SvcSelector] down spatial then down temporal from {} => 0", current.temporal); + current.temporal = 0; } - } else if target.temporal < current.temporal { - //down temporal => need wait end_frame - if self.pre_end_frame { - log::info!("[Vp9SvcSelector] down temporal {},{} => {},{}", current.spatial, current.temporal, target.spatial, target.temporal); + // In K-SVC we must wait for a keyframe. + // In full SVC we do not need a keyframe. + if (self.k_svc && *key) || (!self.k_svc && self.pre_end_frame) { + log::info!("[Vp9SvcSelector] down {},{} => {},{} with key", current.spatial, current.temporal, target.spatial, target.temporal); + // with other spatial we have difference tl0xidx and pic_id offset + // therefore we need reinit both tl0idx and pic_id + ctx.vp9_ctx.pic_id_rewrite.reinit(); + ctx.seq_rewrite.reinit(); + ctx.ts_rewrite.reinit(); + current.spatial = target.spatial; current.temporal = target.temporal; + } else if self.k_svc { + self.queue.push_back(Action::RequestKeyFrame); } } - } else if target.spatial < current.spatial { - // down spatial => need wait key-frame - // first we allway down temporal for trying reduce bandwidth - if current.temporal != 0 && self.pre_end_frame { - log::info!("[Vp9SvcSelector] down spatial then down temporal from {} => 0", current.temporal); - current.temporal = 0; - } - // In K-SVC we must wait for a keyframe. - // In full SVC we do not need a keyframe. - if (self.k_svc && *key) || (!self.k_svc && self.pre_end_frame) { - log::info!("[Vp9SvcSelector] down {},{} => {},{} with key", current.spatial, current.temporal, target.spatial, target.temporal); - // with other spatial we have difference tl0xidx and pic_id offset - // therefore we need reinit both tl0idx and pic_id - ctx.vp9_ctx.pic_id_rewrite.reinit(); - ctx.seq_rewrite.reinit(); - ctx.ts_rewrite.reinit(); - current.spatial = target.spatial; - current.temporal = target.temporal; - } else if self.k_svc { - self.queue.push_back(Action::RequestKeyFrame); - } - } else if target.spatial > current.spatial { - // up spatial => need wait key-frame - // first we try to up temporal for trying increase bandwidth - if svc.spatial == current.spatial && svc.temporal > current.temporal && current.temporal != 2 && svc.switching_point && self.pre_end_frame { - log::info!("[Vp9SvcSelector] up spatial then up temporal from {} => 2 before key arrived", current.temporal); - current.temporal = 2; - } + Ordering::Greater => { + // up spatial => need wait key-frame + // first we try to up temporal for trying to increase bandwidth + if svc.spatial == current.spatial && svc.temporal > current.temporal && current.temporal != 2 && svc.switching_point && self.pre_end_frame { + log::info!("[Vp9SvcSelector] up spatial then up temporal from {} => 2 before key arrived", current.temporal); + current.temporal = 2; + } - if *key { - log::info!("[Vp9SvcSelector] up {},{} => {},{} with key-frame", current.spatial, current.temporal, target.spatial, target.temporal); - // with other spatial we have difference tl0xidx and pic_id offset - // therefore we need reinit both tl0idx and pic_id - ctx.vp9_ctx.pic_id_rewrite.reinit(); - ctx.seq_rewrite.reinit(); - ctx.ts_rewrite.reinit(); - current.spatial = target.spatial; - current.temporal = target.temporal; - } else { - self.queue.push_back(Action::RequestKeyFrame); + if *key { + log::info!("[Vp9SvcSelector] up {},{} => {},{} with key-frame", current.spatial, current.temporal, target.spatial, target.temporal); + // with other spatial we have difference tl0xidx and pic_id offset + // therefore we need reinit both tl0idx and pic_id + ctx.vp9_ctx.pic_id_rewrite.reinit(); + ctx.seq_rewrite.reinit(); + ctx.ts_rewrite.reinit(); + current.spatial = target.spatial; + current.temporal = target.temporal; + } else { + self.queue.push_back(Action::RequestKeyFrame); + } } } } @@ -163,7 +170,7 @@ impl Selector { } } (None, None) => { - //reject + // reject } } diff --git a/packages/media_core/src/endpoint/internal/remote_track.rs b/packages/media_core/src/endpoint/internal/remote_track.rs index 551f6634..088f484f 100644 --- a/packages/media_core/src/endpoint/internal/remote_track.rs +++ b/packages/media_core/src/endpoint/internal/remote_track.rs @@ -132,13 +132,13 @@ impl EndpointRemoteTrack { //TODO clear self.last_layer if switched to new track if media.layers.is_some() { log::debug!("[EndpointRemoteTrack] on layers info {:?}", media.layers); - self.last_layers = media.layers.clone(); + self.last_layers.clone_from(&media.layers); } // We restore last_layer if key frame not contain for allow consumers fast switching if media.meta.is_video_key() && media.layers.is_none() && self.last_layers.is_some() { log::debug!("[EndpointRemoteTrack] set layers info to key-frame {:?}", media.layers); - media.layers = self.last_layers.clone(); + media.layers.clone_from(&self.last_layers); } let room = return_if_none!(self.room.as_ref()); diff --git a/packages/media_gateway/src/agent_service.rs b/packages/media_gateway/src/agent_service.rs index 33216c21..847d2b8f 100644 --- a/packages/media_gateway/src/agent_service.rs +++ b/packages/media_gateway/src/agent_service.rs @@ -25,12 +25,12 @@ struct ServiceWorkersStats { workers: HashMap, } -impl Into for &ServiceWorkersStats { - fn into(self) -> ServiceStats { - ServiceStats { - live: self.workers.values().sum(), - max: self.max, - active: true, //TODO how to update this? maybe with gradeful-shutdown +impl From<&ServiceWorkersStats> for ServiceStats { + fn from(value: &ServiceWorkersStats) -> Self { + Self { + live: value.workers.values().sum(), + max: value.max, + active: true, //TODO how to update this? maybe with graceful-shutdown } } } diff --git a/packages/media_secure/src/jwt.rs b/packages/media_secure/src/jwt.rs index 4f0f674a..a28f0ebb 100644 --- a/packages/media_secure/src/jwt.rs +++ b/packages/media_secure/src/jwt.rs @@ -2,8 +2,8 @@ use crate::{MediaConsoleSecure, MediaEdgeSecure, MediaGatewaySecure}; use jwt_simple::prelude::*; use serde::{de::DeserializeOwned, Serialize}; -const CONN_ID_TYPE: &'static str = "conn"; -const CONSOLE_SESSION_TYPE: &'static str = "console_session"; +const CONN_ID_TYPE: &str = "conn"; +const CONSOLE_SESSION_TYPE: &str = "console_session"; pub struct MediaEdgeSecureJwt { key: HS256Key, diff --git a/packages/transport_webrtc/src/media/vp9.rs b/packages/transport_webrtc/src/media/vp9.rs index 59523eec..e41b9ec9 100644 --- a/packages/transport_webrtc/src/media/vp9.rs +++ b/packages/transport_webrtc/src/media/vp9.rs @@ -35,7 +35,7 @@ pub fn rewrite_rtp(payload: &mut [u8], svc: &Vp9Svc) { } } -#[allow(unused)] +#[allow(unused, clippy::enum_variant_names)] enum PacketError { ErrShortPacket, ErrTooManySpatialLayers, @@ -90,9 +90,9 @@ struct Vp9Header { pub g: bool, /// N_G indicates the number of pictures in a Picture Group (PG) pub ng: u8, - /// + /// Width pub width: Vec, - /// + /// Height pub height: Vec, /// Temporal layer ID of pictures in a Picture Group pub pgtid: Vec, diff --git a/packages/transport_webrtc/src/transport.rs b/packages/transport_webrtc/src/transport.rs index 22673de6..aad12291 100644 --- a/packages/transport_webrtc/src/transport.rs +++ b/packages/transport_webrtc/src/transport.rs @@ -42,6 +42,7 @@ mod webrtc; mod whep; mod whip; +#[allow(clippy::large_enum_variant)] pub enum VariantParams { Whip(RoomId, PeerId), Whep(RoomId, PeerId), @@ -55,6 +56,7 @@ pub enum Variant { Webrtc, } +#[allow(clippy::large_enum_variant)] pub enum ExtIn { RemoteIce(u64, Variant, Vec), RestartIce(u64, Variant, IpAddr, String, ConnectRequest), diff --git a/packages/transport_webrtc/src/transport/webrtc.rs b/packages/transport_webrtc/src/transport/webrtc.rs index 5193a097..5ca59fde 100644 --- a/packages/transport_webrtc/src/transport/webrtc.rs +++ b/packages/transport_webrtc/src/transport/webrtc.rs @@ -98,17 +98,14 @@ impl TransportWebrtcSdk { join: Some((j.room.into(), j.peer.into(), j.metadata, j.publish.unwrap_or_default().into(), j.subscribe.unwrap_or_default().into())), state: State::New, audio_mixer: j.features.and_then(|f| { - f.mixer.and_then(|m| { - Some(AudioMixerConfig { - mode: m.mode().into(), - outputs: m - .outputs - .iter() - .map(|r| local_tracks.iter().find(|l| l.name() == r.as_str()).map(|l| l.id())) - .flatten() - .collect::>(), - sources: m.sources.into_iter().map(|s| s.into()).collect::>(), - }) + f.mixer.map(|m| AudioMixerConfig { + mode: m.mode().into(), + outputs: m + .outputs + .iter() + .filter_map(|r| local_tracks.iter().find(|l| l.name() == r.as_str()).map(|l| l.id())) + .collect::>(), + sources: m.sources.into_iter().map(|s| s.into()).collect::>(), }) }), local_tracks, @@ -616,14 +613,13 @@ impl TransportWebrtcSdk { Some(protobuf::session::request::Request::Room(_room)) => { todo!() } - Some(protobuf::session::request::Request::Features(features_req)) => match features_req.request { - Some(protobuf::features::request::Request::Mixer(mixer_req)) => { + Some(protobuf::session::request::Request::Features(features_req)) => { + if let Some(protobuf::features::request::Request::Mixer(mixer_req)) = features_req.request { if let Some(mixer_req) = mixer_req.request { self.on_mixer_req(req.req_id, mixer_req); } } - None => {} - }, + } None => self.send_rpc_res_err(req.req_id, RpcError::new2(WebrtcError::RpcInvalidRequest)), }, } @@ -641,12 +637,10 @@ impl TransportWebrtcSdk { if let Some(token) = self.secure.decode_obj::("webrtc", &req.token) { if token.room == Some(info.room.clone()) && token.peer == Some(info.peer.clone()) { let mixer_cfg = info.features.and_then(|f| { - f.mixer.and_then(|m| { - Some(AudioMixerConfig { - mode: m.mode().into(), - outputs: m.outputs.iter().map(|r| self.local_track_by_name(r.as_str()).map(|l| l.id())).flatten().collect::>(), - sources: m.sources.into_iter().map(|s| s.into()).collect::>(), - }) + f.mixer.map(|m| AudioMixerConfig { + mode: m.mode().into(), + outputs: m.outputs.iter().filter_map(|r| self.local_track_by_name(r.as_str()).map(|l| l.id())).collect::>(), + sources: m.sources.into_iter().map(|s| s.into()).collect::>(), }) }); self.queue.push_back(build_req(EndpointReq::JoinRoom( diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 00000000..87493917 --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "1.79.0" +components = ["rustfmt", "clippy"]