diff --git a/bin/src/http.rs b/bin/src/http.rs index 07de7460..6829c8d2 100644 --- a/bin/src/http.rs +++ b/bin/src/http.rs @@ -36,6 +36,7 @@ impl Rpc { (Self { req, answer_tx }, answer_rx) } + #[allow(unused)] pub fn res(self, res: Res) { let _ = self.answer_tx.send(res); } diff --git a/bin/src/http/utils/payload_sdp.rs b/bin/src/http/utils/payload_sdp.rs index dcdd0057..d4488678 100644 --- a/bin/src/http/utils/payload_sdp.rs +++ b/bin/src/http/utils/payload_sdp.rs @@ -35,10 +35,7 @@ impl Payload for ApplicationSdp { const CONTENT_TYPE: &'static str = "application/sdp"; fn check_content_type(content_type: &str) -> bool { - match content_type { - "application/sdp" => true, - _ => false, - } + content_type.eq(Self::CONTENT_TYPE) } fn schema_ref() -> MetaSchemaRef { @@ -49,8 +46,8 @@ impl Payload for ApplicationSdp { impl ParsePayload for ApplicationSdp { const IS_REQUIRED: bool = true; - fn from_request(request: &Request, body: &mut RequestBody) -> impl std::future::Future> + Send { - async move { Ok(Self(String::from_request(request, body).await?)) } + async fn from_request(request: &Request, body: &mut RequestBody) -> Result { + Ok(Self(String::from_request(request, body).await?)) } } @@ -100,10 +97,7 @@ impl Payload for ApplicationSdpPatch { const CONTENT_TYPE: &'static str = "application/trickle-ice-sdpfrag"; fn check_content_type(content_type: &str) -> bool { - match content_type { - "application/trickle-ice-sdpfrag" => true, - _ => false, - } + content_type.eq(Self::CONTENT_TYPE) } fn schema_ref() -> MetaSchemaRef { @@ -114,8 +108,8 @@ impl Payload for ApplicationSdpPatch { impl ParsePayload for ApplicationSdpPatch { const IS_REQUIRED: bool = true; - fn from_request(request: &Request, body: &mut RequestBody) -> impl std::future::Future> + Send { - async move { Ok(Self(String::from_request(request, body).await?)) } + async fn from_request(request: &Request, body: &mut RequestBody) -> Result { + Ok(Self(String::from_request(request, body).await?)) } } diff --git a/bin/src/http/utils/remote_ip.rs b/bin/src/http/utils/remote_ip.rs index e6fff11c..775a30d9 100644 --- a/bin/src/http/utils/remote_ip.rs +++ b/bin/src/http/utils/remote_ip.rs @@ -6,25 +6,19 @@ use poem::{http::StatusCode, FromRequest}; pub struct RemoteIpAddr(pub IpAddr); impl<'a> FromRequest<'a> for RemoteIpAddr { - fn from_request(req: &'a poem::Request, _body: &mut poem::RequestBody) -> impl std::future::Future> + Send { - async move { - let headers = req.headers(); - if let Some(remote_addr) = headers.get("X-Forwarded-For") { - let remote_addr = remote_addr.to_str().map_err(|_| poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST))?; - let remote_addr = remote_addr.split(',').next().ok_or(poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST))?; - return Ok(RemoteIpAddr(remote_addr.parse().map_err(|_| poem::Error::from_string("Invalid IP address", StatusCode::BAD_REQUEST))?)); - } else if let Some(remote_addr) = headers.get("X-Real-IP") { - let remote_addr = remote_addr.to_str().map_err(|_| poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST))?; - return Ok(RemoteIpAddr(remote_addr.parse().map_err(|_| poem::Error::from_string("Invalid IP address", StatusCode::BAD_REQUEST))?)); - } else { - match req.remote_addr().deref() { - poem::Addr::SocketAddr(addr) => { - return Ok(RemoteIpAddr(addr.ip())); - } - _ => { - return Err(poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST)); - } - } + async fn from_request(req: &'a poem::Request, _body: &mut poem::RequestBody) -> poem::Result { + let headers = req.headers(); + if let Some(remote_addr) = headers.get("X-Forwarded-For") { + let remote_addr = remote_addr.to_str().map_err(|_| poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST))?; + let remote_addr = remote_addr.split(',').next().ok_or(poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST))?; + Ok(RemoteIpAddr(remote_addr.parse().map_err(|_| poem::Error::from_string("Invalid IP address", StatusCode::BAD_REQUEST))?)) + } else if let Some(remote_addr) = headers.get("X-Real-IP") { + let remote_addr = remote_addr.to_str().map_err(|_| poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST))?; + Ok(RemoteIpAddr(remote_addr.parse().map_err(|_| poem::Error::from_string("Invalid IP address", StatusCode::BAD_REQUEST))?)) + } else { + match req.remote_addr().deref() { + poem::Addr::SocketAddr(addr) => Ok(RemoteIpAddr(addr.ip())), + _ => Err(poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST)), } } } diff --git a/bin/src/http/utils/token.rs b/bin/src/http/utils/token.rs index 26109365..07144892 100644 --- a/bin/src/http/utils/token.rs +++ b/bin/src/http/utils/token.rs @@ -1,4 +1,4 @@ -use poem_openapi::{auth::Bearer, OpenApi, SecurityScheme}; +use poem_openapi::{auth::Bearer, SecurityScheme}; #[derive(SecurityScheme)] #[oai(rename = "Token Authorization", ty = "bearer", key_in = "header", key_name = "Authorization")] diff --git a/bin/src/http/utils/user_agent.rs b/bin/src/http/utils/user_agent.rs index 6b95bdba..c195d9f4 100644 --- a/bin/src/http/utils/user_agent.rs +++ b/bin/src/http/utils/user_agent.rs @@ -4,12 +4,10 @@ use poem::{http::StatusCode, FromRequest}; pub struct UserAgent(pub String); impl<'a> FromRequest<'a> for UserAgent { - fn from_request(req: &'a poem::Request, _body: &mut poem::RequestBody) -> impl std::future::Future> + Send { - async move { - let headers = req.headers(); - let user_agent = headers.get("User-Agent").ok_or(poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST))?; - let user_agent = user_agent.to_str().map_err(|_| poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST))?; - Ok(UserAgent(user_agent.into())) - } + async fn from_request(req: &'a poem::Request, _body: &mut poem::RequestBody) -> poem::Result { + let headers = req.headers(); + let user_agent = headers.get("User-Agent").ok_or(poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST))?; + let user_agent = user_agent.to_str().map_err(|_| poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST))?; + Ok(UserAgent(user_agent.into())) } } diff --git a/bin/src/main.rs b/bin/src/main.rs index f9edf569..623fae11 100644 --- a/bin/src/main.rs +++ b/bin/src/main.rs @@ -82,7 +82,11 @@ async fn main() { #[cfg(feature = "media")] server::ServerType::Media(args) => server::run_media_server(workers, http_port, node, args).await, #[cfg(feature = "cert_utils")] - server::ServerType::Cert(args) => server::run_cert_utils(args).await, + server::ServerType::Cert(args) => { + if let Err(e) = server::run_cert_utils(args).await { + log::error!("create cert error {:?}", e); + } + } } }) .await; diff --git a/bin/src/quinn/builder.rs b/bin/src/quinn/builder.rs index 3868996a..2b122277 100644 --- a/bin/src/quinn/builder.rs +++ b/bin/src/quinn/builder.rs @@ -19,7 +19,7 @@ pub fn make_quinn_server(socket: VirtualUdpSocket, priv_key: PrivatePkcs8KeyDer< pub fn make_quinn_client(socket: VirtualUdpSocket, server_certs: &[CertificateDer]) -> Result> { let runtime = Arc::new(TokioRuntime); let mut config = EndpointConfig::default(); - //Note that client mtu size shoud be smaller than server's + //Note that client mtu size should be smaller than server's config.max_udp_payload_size(1400).expect("Should config quinn client max_size to 1400"); let mut endpoint = Endpoint::new_with_abstract_socket(config, None, Arc::new(socket), runtime)?; endpoint.set_default_client_config(configure_client(server_certs)?); @@ -70,11 +70,11 @@ impl SkipServerVerification { } impl ServerCertVerifier for SkipServerVerification { - fn verify_tls12_signature(&self, message: &[u8], cert: &CertificateDer<'_>, dss: &rustls::DigitallySignedStruct) -> Result { + fn verify_tls12_signature(&self, _message: &[u8], _cert: &CertificateDer<'_>, _dss: &rustls::DigitallySignedStruct) -> Result { Ok(rustls::client::danger::HandshakeSignatureValid::assertion()) } - fn verify_tls13_signature(&self, message: &[u8], cert: &CertificateDer<'_>, dss: &rustls::DigitallySignedStruct) -> Result { + fn verify_tls13_signature(&self, _message: &[u8], _cert: &CertificateDer<'_>, _dss: &rustls::DigitallySignedStruct) -> Result { Ok(rustls::client::danger::HandshakeSignatureValid::assertion()) } @@ -84,11 +84,11 @@ impl ServerCertVerifier for SkipServerVerification { fn verify_server_cert( &self, - end_entity: &CertificateDer<'_>, - intermediates: &[CertificateDer<'_>], - server_name: &rustls::pki_types::ServerName<'_>, - ocsp_response: &[u8], - now: rustls::pki_types::UnixTime, + _end_entity: &CertificateDer<'_>, + _intermediates: &[CertificateDer<'_>], + _server_name: &rustls::pki_types::ServerName<'_>, + _ocsp_response: &[u8], + _now: rustls::pki_types::UnixTime, ) -> Result { Ok(rustls::client::danger::ServerCertVerified::assertion()) } diff --git a/bin/src/quinn/mod.rs b/bin/src/quinn/mod.rs index 04ab220b..c93a316b 100644 --- a/bin/src/quinn/mod.rs +++ b/bin/src/quinn/mod.rs @@ -4,4 +4,3 @@ mod vsocket; pub use builder::{make_quinn_client, make_quinn_server}; pub use vnet::VirtualNetwork; -pub use vsocket::VirtualUdpSocket; diff --git a/bin/src/quinn/vsocket.rs b/bin/src/quinn/vsocket.rs index 25809ac5..5aecc6d6 100644 --- a/bin/src/quinn/vsocket.rs +++ b/bin/src/quinn/vsocket.rs @@ -22,7 +22,7 @@ pub struct Poller {} impl UdpPoller for Poller { fn poll_writable(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { - //TODO implement this for better performace + //TODO implement this for better performance Poll::Ready(Ok(())) } } diff --git a/bin/src/server/cert.rs b/bin/src/server/cert.rs index 20138071..a68202bb 100644 --- a/bin/src/server/cert.rs +++ b/bin/src/server/cert.rs @@ -10,10 +10,11 @@ pub struct Args { domains: Vec, } -pub async fn run_cert_utils(args: Args) { - let cert = rcgen::generate_simple_self_signed(args.domains).unwrap(); +pub async fn run_cert_utils(args: Args) -> Result<(), Box> { + let cert = rcgen::generate_simple_self_signed(args.domains)?; let start = SystemTime::now(); let since_the_epoch = start.duration_since(UNIX_EPOCH).expect("Time went backwards").as_millis(); - std::fs::write(format!("./certificate-{}.cert", since_the_epoch), cert.cert.der().to_vec()).unwrap(); - std::fs::write(format!("./certificate-{}.key", since_the_epoch), cert.key_pair.serialize_der().to_vec()).unwrap(); + std::fs::write(format!("./certificate-{}.cert", since_the_epoch), cert.cert.der())?; + std::fs::write(format!("./certificate-{}.key", since_the_epoch), cert.key_pair.serialize_der())?; + Ok(()) } diff --git a/bin/src/server/connector.rs b/bin/src/server/connector.rs index 78a664e6..dc3837eb 100644 --- a/bin/src/server/connector.rs +++ b/bin/src/server/connector.rs @@ -3,6 +3,6 @@ use clap::Parser; #[derive(Debug, Parser)] pub struct Args {} -pub async fn run_media_connector(workers: usize, args: Args) { +pub async fn run_media_connector(_workers: usize, _args: Args) { println!("Running media connector"); } diff --git a/bin/src/server/gateway.rs b/bin/src/server/gateway.rs index b730cca2..4b9bfa6c 100644 --- a/bin/src/server/gateway.rs +++ b/bin/src/server/gateway.rs @@ -106,7 +106,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod let (mut vnet, vnet_tx, mut vnet_rx) = VirtualNetwork::new(node.node_id); let media_rpc_socket = vnet.udp_socket(0).await.expect("Should open virtual port for gateway rpc"); - let media_rpc_client = MediaEdgeServiceClient::new(QuinnClient::new(make_quinn_client(media_rpc_socket, &vec![]).expect("Should create endpoint for media rpc client"))); + let media_rpc_client = MediaEdgeServiceClient::new(QuinnClient::new(make_quinn_client(media_rpc_socket, &[]).expect("Should create endpoint for media rpc client"))); let media_rpc_socket = vnet.udp_socket(GATEWAY_RPC_PORT).await.expect("Should open virtual port for gateway rpc"); let mut media_rpc_server = MediaEdgeServiceServer::new( @@ -123,7 +123,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod media_rpc_server.run().await; }); - tokio::task::spawn_local(async move { while let Some(_) = vnet.recv().await {} }); + tokio::task::spawn_local(async move { while vnet.recv().await.is_some() {} }); loop { if controller.process().is_none() { diff --git a/bin/src/server/gateway/dest_selector.rs b/bin/src/server/gateway/dest_selector.rs index 696b618d..6751b37b 100644 --- a/bin/src/server/gateway/dest_selector.rs +++ b/bin/src/server/gateway/dest_selector.rs @@ -7,9 +7,11 @@ use tokio::sync::{ oneshot, }; +type QueryRequest = (ServiceKind, Option<(f32, f32)>, oneshot::Sender>); + #[derive(Clone)] pub struct GatewayDestSelector { - tx: Sender<(ServiceKind, Option<(f32, f32)>, oneshot::Sender>)>, + tx: Sender, } impl GatewayDestSelector { @@ -21,7 +23,7 @@ impl GatewayDestSelector { } pub struct GatewayDestRequester { - rx: Receiver<(ServiceKind, Option<(f32, f32)>, oneshot::Sender>)>, + rx: Receiver, req_seed: u64, reqs: HashMap>>, } @@ -31,7 +33,7 @@ impl GatewayDestRequester { match event { media_server_gateway::store_service::Event::FindNodeRes(req_id, res) => { if let Some(tx) = self.reqs.remove(&req_id) { - if let Err(_) = tx.send(res) { + if tx.send(res).is_err() { log::error!("[GatewayDestRequester] answer for req_id {req_id} error"); } } diff --git a/bin/src/server/gateway/rpc_handler.rs b/bin/src/server/gateway/rpc_handler.rs index 39d72dd9..d34fb627 100644 --- a/bin/src/server/gateway/rpc_handler.rs +++ b/bin/src/server/gateway/rpc_handler.rs @@ -31,7 +31,7 @@ pub struct MediaRpcHandlerImpl {} impl MediaEdgeServiceHandler for MediaRpcHandlerImpl { async fn whip_connect(&self, ctx: &Ctx, req: WhipConnectRequest) -> Option { log::info!("On whip_connect from other gateway"); - let location = req.ip.parse().ok().map(|ip| ctx.ip2location.get_location(&ip)).flatten(); + let location = req.ip.parse().ok().and_then(|ip| ctx.ip2location.get_location(&ip)); let dest = ctx.selector.select(ServiceKind::Webrtc, location).await?; let dest_addr = node_vnet_addr(dest, GATEWAY_RPC_PORT); ctx.client.whip_connect(dest_addr, req).await @@ -55,7 +55,7 @@ impl MediaEdgeServiceHandler for MediaRpcHandlerImpl { async fn whep_connect(&self, ctx: &Ctx, req: WhepConnectRequest) -> Option { log::info!("On whep_connect from other gateway"); - let location = req.ip.parse().ok().map(|ip| ctx.ip2location.get_location(&ip)).flatten(); + let location = req.ip.parse().ok().and_then(|ip| ctx.ip2location.get_location(&ip)); let dest = ctx.selector.select(ServiceKind::Webrtc, location).await?; let dest_addr = node_vnet_addr(dest, GATEWAY_RPC_PORT); ctx.client.whep_connect(dest_addr, req).await @@ -79,7 +79,7 @@ impl MediaEdgeServiceHandler for MediaRpcHandlerImpl { async fn webrtc_connect(&self, ctx: &Ctx, req: WebrtcConnectRequest) -> Option { log::info!("On webrtc_connect from other gateway"); - let location = req.ip.parse().ok().map(|ip| ctx.ip2location.get_location(&ip)).flatten(); + let location = req.ip.parse().ok().and_then(|ip| ctx.ip2location.get_location(&ip)); let dest = ctx.selector.select(ServiceKind::Webrtc, location).await?; let dest_addr = node_vnet_addr(dest, GATEWAY_RPC_PORT); ctx.client.webrtc_connect(dest_addr, req).await diff --git a/bin/src/server/media.rs b/bin/src/server/media.rs index 68d29d46..1a13a424 100644 --- a/bin/src/server/media.rs +++ b/bin/src/server/media.rs @@ -123,7 +123,7 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node media_rpc_server.run().await; }); - tokio::task::spawn_local(async move { while let Some(_) = vnet.recv().await {} }); + tokio::task::spawn_local(async move { while vnet.recv().await.is_some() {} }); loop { if controller.process().is_none() { @@ -160,7 +160,7 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node log::info!("on req {req_id} res from worker {worker}"); let res = res.up(worker).up((node_id, node_session)); if let Some(tx) = reqs.remove(&req_id) { - if let Err(_) = tx.send(res) { + if tx.send(res).is_err() { log::error!("Send rpc response error for req {req_id}"); } } diff --git a/bin/src/server/media/rpc_handler.rs b/bin/src/server/media/rpc_handler.rs index 4a7d73f1..e704dedc 100644 --- a/bin/src/server/media/rpc_handler.rs +++ b/bin/src/server/media/rpc_handler.rs @@ -56,7 +56,7 @@ impl MediaEdgeServiceHandler for MediaRpcHandlerImpl { let res = rx.await.ok()?; //TODO process with ICE restart match res { - RpcRes::Whip(whip::RpcRes::RemoteIce(res)) => res.ok().map(|r| WhipRemoteIceResponse { conn }), + RpcRes::Whip(whip::RpcRes::RemoteIce(res)) => res.ok().map(|_r| WhipRemoteIceResponse { conn }), _ => None, } } @@ -70,7 +70,7 @@ impl MediaEdgeServiceHandler for MediaRpcHandlerImpl { let res = rx.await.ok()?; //TODO process with ICE restart match res { - RpcRes::Whip(whip::RpcRes::Delete(res)) => res.ok().map(|r| WhipCloseResponse { conn }), + RpcRes::Whip(whip::RpcRes::Delete(res)) => res.ok().map(|_r| WhipCloseResponse { conn }), _ => None, } } @@ -100,7 +100,7 @@ impl MediaEdgeServiceHandler for MediaRpcHandlerImpl { let res = rx.await.ok()?; //TODO process with ICE restart match res { - RpcRes::Whep(whep::RpcRes::RemoteIce(res)) => res.ok().map(|r| WhepRemoteIceResponse { conn }), + RpcRes::Whep(whep::RpcRes::RemoteIce(res)) => res.ok().map(|_r| WhepRemoteIceResponse { conn }), _ => None, } } @@ -114,7 +114,7 @@ impl MediaEdgeServiceHandler for MediaRpcHandlerImpl { let res = rx.await.ok()?; //TODO process with ICE restart match res { - RpcRes::Whep(whep::RpcRes::Delete(res)) => res.ok().map(|r| WhepCloseResponse { conn }), + RpcRes::Whep(whep::RpcRes::Delete(res)) => res.ok().map(|_r| WhepCloseResponse { conn }), _ => None, } } diff --git a/bin/src/server/media/runtime_worker.rs b/bin/src/server/media/runtime_worker.rs index 0811cb25..242431aa 100644 --- a/bin/src/server/media/runtime_worker.rs +++ b/bin/src/server/media/runtime_worker.rs @@ -63,7 +63,7 @@ impl WorkerInner MediaCluster { } } - pub fn shutdown<'a>(&mut self, now: Instant) { + pub fn shutdown(&mut self, now: Instant) { self.rooms.on_shutdown(now); } } diff --git a/packages/media_core/src/cluster/room/channel_sub.rs b/packages/media_core/src/cluster/room/channel_sub.rs index 26c06602..0ada5077 100644 --- a/packages/media_core/src/cluster/room/channel_sub.rs +++ b/packages/media_core/src/cluster/room/channel_sub.rs @@ -102,7 +102,7 @@ impl RoomChannelSubscribe { owner ); self.subscribers.insert((owner, track), (channel_id, target_peer, target_track)); - let channel_container = self.channels.entry(channel_id).or_insert(Default::default()); + let channel_container = self.channels.entry(channel_id).or_default(); channel_container.owners.push((owner, track)); if channel_container.owners.len() == 1 { log::info!("[ClusterRoom {}/Subscribers] first subscriber => Sub channel {channel_id}", self.room); @@ -136,7 +136,7 @@ impl RoomChannelSubscribe { if let Some(sum_fb) = &mut sum_fb { *sum_fb = *sum_fb + *fb; } else { - sum_fb = Some(fb.clone()); + sum_fb = Some(*fb); } } log::debug!("[ClusterRoom {}/Subscribers] channel {channel_id} setting desired bitrate {:?}", self.room, sum_fb); diff --git a/packages/media_core/src/cluster/room/metadata.rs b/packages/media_core/src/cluster/room/metadata.rs index 80d4bb51..5a60cec5 100644 --- a/packages/media_core/src/cluster/room/metadata.rs +++ b/packages/media_core/src/cluster/room/metadata.rs @@ -144,18 +144,14 @@ impl RoomMetadata { self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(peer_map, MapControl::Del(track_key)))); } - if self.peers_map_subscribers.remove(&owner).is_some() { - if self.peers_map_subscribers.is_empty() { - log::info!("[ClusterRoom {}] last peer unsub peers map => unsubscribe", self.room); - self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(self.peers_map, MapControl::Unsub))); - } + if self.peers_map_subscribers.remove(&owner).is_some() && self.peers_map_subscribers.is_empty() { + log::info!("[ClusterRoom {}] last peer unsub peers map => unsubscribe", self.room); + self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(self.peers_map, MapControl::Unsub))); } - if self.tracks_map_subscribers.remove(&owner).is_some() { - if self.tracks_map_subscribers.is_empty() { - log::info!("[ClusterRoom {}] last peer unsub tracks map => unsubscribe", self.room); - self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(self.tracks_map, MapControl::Unsub))); - } + if self.tracks_map_subscribers.remove(&owner).is_some() && self.tracks_map_subscribers.is_empty() { + log::info!("[ClusterRoom {}] last peer unsub tracks map => unsubscribe", self.room); + self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(self.tracks_map, MapControl::Unsub))); } // check if this peer manual subscribe to some private peer map => need send Unsub diff --git a/packages/media_core/src/endpoint.rs b/packages/media_core/src/endpoint.rs index b8bf7609..b7157060 100644 --- a/packages/media_core/src/endpoint.rs +++ b/packages/media_core/src/endpoint.rs @@ -198,7 +198,7 @@ impl, ExtIn, ExtOut> Endpoint { transport: TaskSwitcherBranch::new(transport, TaskType::Transport), internal: TaskSwitcherBranch::new(EndpointInternal::new(cfg), TaskType::Internal), switcher: TaskSwitcher::new(2), - _tmp: PhantomData::default(), + _tmp: PhantomData, } } } diff --git a/packages/media_core/src/endpoint/internal.rs b/packages/media_core/src/endpoint/internal.rs index b0564e43..b20d80ca 100644 --- a/packages/media_core/src/endpoint/internal.rs +++ b/packages/media_core/src/endpoint/internal.rs @@ -72,7 +72,7 @@ impl EndpointInternal { } } - pub fn on_tick<'a>(&mut self, now: Instant) { + pub fn on_tick(&mut self, now: Instant) { self.bitrate_allocator.input(&mut self.switcher).on_tick(); self.local_tracks.input(&mut self.switcher).on_tick(now); self.remote_tracks.input(&mut self.switcher).on_tick(now); @@ -104,14 +104,14 @@ impl EndpointInternal { TransportEvent::LocalTrack(track, event) => self.on_transport_local_track(now, track, event), TransportEvent::Stats(stats) => self.on_transport_stats(now, stats), TransportEvent::EgressBitrateEstimate(bitrate) => { - let bitrate2 = bitrate.min(self.cfg.max_egress_bitrate as u64); + let bitrate2 = bitrate.min(self.cfg.max_egress_bitrate); log::debug!("[EndpointInternal] limit egress bitrate {bitrate2}, rewrite from {bitrate}"); self.bitrate_allocator.input(&mut self.switcher).set_egress_estimate(bitrate2); } } } - pub fn on_transport_rpc<'a>(&mut self, now: Instant, req_id: EndpointReqId, req: EndpointReq) { + pub fn on_transport_rpc(&mut self, now: Instant, req_id: EndpointReqId, req: EndpointReq) { match req { EndpointReq::JoinRoom(room, peer, meta, publish, subscribe) => { if matches!(self.state, TransportState::Connecting) { @@ -159,7 +159,7 @@ impl EndpointInternal { } } - fn on_transport_state_changed<'a>(&mut self, now: Instant, state: TransportState) { + fn on_transport_state_changed(&mut self, now: Instant, state: TransportState) { self.state = state; match &self.state { TransportState::Connecting => { @@ -186,10 +186,10 @@ impl EndpointInternal { } } - fn on_transport_remote_track<'a>(&mut self, now: Instant, track: RemoteTrackId, event: RemoteTrackEvent) { + fn on_transport_remote_track(&mut self, now: Instant, track: RemoteTrackId, event: RemoteTrackEvent) { if let Some(meta) = event.need_create() { log::info!("[EndpointInternal] create remote track {:?}", track); - let room = self.joined.as_ref().map(|j| j.0.clone()); + let room = self.joined.as_ref().map(|j| j.0); let index = self.remote_tracks.input(&mut self.switcher).add_task(EndpointRemoteTrack::new(room, meta)); self.remote_tracks_id.insert(track, index); } @@ -197,10 +197,10 @@ impl EndpointInternal { self.remote_tracks.input(&mut self.switcher).on_event(now, *index, remote_track::Input::Event(event)); } - fn on_transport_local_track<'a>(&mut self, now: Instant, track: LocalTrackId, event: LocalTrackEvent) { + fn on_transport_local_track(&mut self, now: Instant, track: LocalTrackId, event: LocalTrackEvent) { if let Some(kind) = event.need_create() { log::info!("[EndpointInternal] create local track {:?}", track); - let room = self.joined.as_ref().map(|j| j.0.clone()); + let room = self.joined.as_ref().map(|j| j.0); let index = self.local_tracks.input(&mut self.switcher).add_task(EndpointLocalTrack::new(kind, room)); self.local_tracks_id.insert(track, index); } @@ -208,9 +208,10 @@ impl EndpointInternal { self.local_tracks.input(&mut self.switcher).on_event(now, *index, local_track::Input::Event(event)); } - fn on_transport_stats<'a>(&mut self, _now: Instant, _stats: TransportStats) {} + fn on_transport_stats(&mut self, _now: Instant, _stats: TransportStats) {} - fn join_room<'a>(&mut self, now: Instant, req_id: EndpointReqId, room: RoomId, peer: PeerId, meta: PeerMeta, publish: RoomInfoPublish, subscribe: RoomInfoSubscribe) { + #[allow(clippy::too_many_arguments)] + fn join_room(&mut self, now: Instant, req_id: EndpointReqId, room: RoomId, peer: PeerId, meta: PeerMeta, publish: RoomInfoPublish, subscribe: RoomInfoSubscribe) { let room_hash: ClusterRoomHash = (&room).into(); log::info!("[EndpointInternal] join_room({room}, {peer}), room_hash {room_hash}"); self.queue.push_back(InternalOutput::RpcRes(req_id, EndpointRes::JoinRoom(Ok(())))); @@ -230,7 +231,7 @@ impl EndpointInternal { } } - fn leave_room<'a>(&mut self, now: Instant) { + fn leave_room(&mut self, now: Instant) { let (hash, room, peer) = return_if_none!(self.joined.take()); log::info!("[EndpointInternal] leave_room({room}, {peer})"); @@ -248,7 +249,7 @@ impl EndpointInternal { /// This block is for cluster related events impl EndpointInternal { - pub fn on_cluster_event<'a>(&mut self, now: Instant, event: ClusterEndpointEvent) { + pub fn on_cluster_event(&mut self, now: Instant, event: ClusterEndpointEvent) { match event { ClusterEndpointEvent::PeerJoined(peer, meta) => self.queue.push_back(InternalOutput::Event(EndpointEvent::PeerJoined(peer, meta))), ClusterEndpointEvent::PeerLeaved(peer, meta) => self.queue.push_back(InternalOutput::Event(EndpointEvent::PeerLeaved(peer, meta))), @@ -259,12 +260,12 @@ impl EndpointInternal { } } - fn on_cluster_remote_track<'a>(&mut self, now: Instant, id: RemoteTrackId, event: ClusterRemoteTrackEvent) { + fn on_cluster_remote_track(&mut self, now: Instant, id: RemoteTrackId, event: ClusterRemoteTrackEvent) { let index = return_if_none!(self.remote_tracks_id.get1(&id)); self.remote_tracks.input(&mut self.switcher).on_event(now, *index, remote_track::Input::Cluster(event)); } - fn on_cluster_local_track<'a>(&mut self, now: Instant, id: LocalTrackId, event: ClusterLocalTrackEvent) { + fn on_cluster_local_track(&mut self, now: Instant, id: LocalTrackId, event: ClusterLocalTrackEvent) { let index = return_if_none!(self.local_tracks_id.get1(&id)); self.local_tracks.input(&mut self.switcher).on_event(now, *index, local_track::Input::Cluster(event)); } @@ -272,7 +273,7 @@ impl EndpointInternal { /// This block for internal local and remote track impl EndpointInternal { - fn pop_remote_tracks<'a>(&mut self, now: Instant) { + fn pop_remote_tracks(&mut self, now: Instant) { let (index, out) = return_if_none!(self.remote_tracks.pop_output(now, &mut self.switcher)); let id = *self.remote_tracks_id.get2(&index).expect("Should have remote_track_id"); diff --git a/packages/media_core/src/endpoint/internal/bitrate_allocator/egress.rs b/packages/media_core/src/endpoint/internal/bitrate_allocator/egress.rs index 73e47173..efa1901c 100644 --- a/packages/media_core/src/endpoint/internal/bitrate_allocator/egress.rs +++ b/packages/media_core/src/endpoint/internal/bitrate_allocator/egress.rs @@ -71,7 +71,7 @@ impl EgressBitrateAllocator { let use_bitrate = self.egress_bitrate.min(self.max_egress_bitrate); let mut sum = TrackPriority(0); for (_track, priority) in self.tracks.iter() { - sum = sum + *priority; + sum += *priority; } if *(sum.as_ref()) != 0 { @@ -82,7 +82,7 @@ impl EgressBitrateAllocator { } } - if self.tracks.len() > 0 { + if !self.tracks.is_empty() { //TODO fix issue when config max_egress_bitrate is lower than stream bitrate, this will make BWE pacer //slow down sending packet, then latency of viewer will be increase let current = use_bitrate; diff --git a/packages/media_core/src/endpoint/internal/bitrate_allocator/ingress.rs b/packages/media_core/src/endpoint/internal/bitrate_allocator/ingress.rs index 4be8ce60..39bf66f0 100644 --- a/packages/media_core/src/endpoint/internal/bitrate_allocator/ingress.rs +++ b/packages/media_core/src/endpoint/internal/bitrate_allocator/ingress.rs @@ -53,7 +53,7 @@ impl IngressBitrateAllocator { self.changed = false; let mut sum = TrackPriority(0); for (_track, priority) in self.tracks.iter() { - sum = sum + *priority; + sum += *priority; } if *(sum.as_ref()) != 0 { diff --git a/packages/media_core/src/endpoint/internal/local_track/packet_selector.rs b/packages/media_core/src/endpoint/internal/local_track/packet_selector.rs index 160965ec..5991baae 100644 --- a/packages/media_core/src/endpoint/internal/local_track/packet_selector.rs +++ b/packages/media_core/src/endpoint/internal/local_track/packet_selector.rs @@ -88,13 +88,13 @@ impl PacketSelector { } pub fn on_tick(&mut self, now_ms: u64) { - self.selector.as_mut().map(|s| s.on_tick(&mut self.ctx, now_ms)); - if self.need_key_frame { - if self.last_key_frame_ts.is_none() || self.last_key_frame_ts.expect("Should have") + REQUEST_KEY_FRAME_INTERVAL_MS <= now_ms { - log::info!("[LocalTrack/PacketSelector] on_tick => request key after interval"); - self.last_key_frame_ts = Some(now_ms); - self.queue.push_back(Action::RequestKeyFrame); - } + if let Some(s) = self.selector.as_mut() { + s.on_tick(&mut self.ctx, now_ms); + } + if self.need_key_frame && (self.last_key_frame_ts.is_none() || self.last_key_frame_ts.expect("Should have") + REQUEST_KEY_FRAME_INTERVAL_MS <= now_ms) { + log::info!("[LocalTrack/PacketSelector] on_tick => request key after interval"); + self.last_key_frame_ts = Some(now_ms); + self.queue.push_back(Action::RequestKeyFrame); } } @@ -111,13 +111,17 @@ impl PacketSelector { pub fn set_target_bitrate(&mut self, now_ms: u64, bitrate: u64) { log::debug!("[LocalTrack/PacketSelector] set target bitrate to {}", bitrate); self.bitrate = Some(bitrate); - self.selector.as_mut().map(|s| s.set_target_bitrate(&mut self.ctx, now_ms, bitrate)); + if let Some(s) = self.selector.as_mut() { + s.set_target_bitrate(&mut self.ctx, now_ms, bitrate); + } } /// Set limit layer, which is used for select best layer pub fn set_limit_layer(&mut self, now_ms: u64, max_spatial: u8, min_spatial: u8) { self.limit = (max_spatial, min_spatial); - self.selector.as_mut().map(|s| s.set_limit_layer(&mut self.ctx, now_ms, max_spatial, min_spatial)); + if let Some(s) = self.selector.as_mut() { + s.set_limit_layer(&mut self.ctx, now_ms, max_spatial, min_spatial); + } } pub fn select(&mut self, now_ms: u64, channel: u64, pkt: &mut MediaPacket) -> Option<()> { @@ -188,8 +192,8 @@ impl PacketSelector { self.need_key_frame = false; } if self.selector.is_none() && pkt.meta.is_video_key() { - self.selector = create_selector(&pkt, bitrate, self.limit); - self.selector.as_mut().map(|s| s.on_init(&mut self.ctx, now_ms)); + self.selector = create_selector(pkt, bitrate, self.limit); + self.selector.as_mut().expect("Should have video selector").on_init(&mut self.ctx, now_ms); } self.selector.as_mut()?.select(&mut self.ctx, now_ms, channel, pkt) @@ -222,23 +226,23 @@ fn create_selector(pkt: &MediaPacket, bitrate: u64, limit: (u8, u8)) -> Option { - let layers = pkt.layers.clone().unwrap_or_else(|| MediaLayersBitrate::default_sim()); + let layers = pkt.layers.clone().unwrap_or_else(MediaLayersBitrate::default_sim); log::info!("[LocalTrack/PacketSelector] create H264SimSelector"); Some(Box::new(video_h264_sim::Selector::new(bitrate, layers.clone(), limit))) } MediaMeta::Vp8 { sim: Some(_), .. } => { - let layers = pkt.layers.clone().unwrap_or_else(|| MediaLayersBitrate::default_sim()); + let layers = pkt.layers.clone().unwrap_or_else(MediaLayersBitrate::default_sim); log::info!("[LocalTrack/PacketSelector] create Vp8SimSelector"); Some(Box::new(video_vp8_sim::Selector::new(bitrate, layers.clone(), limit))) } MediaMeta::Vp9 { svc: Some(_), .. } => { - let layers = pkt.layers.clone().unwrap_or_else(|| MediaLayersBitrate::default_sim()); + let layers = pkt.layers.clone().unwrap_or_else(MediaLayersBitrate::default_sim); log::info!("[LocalTrack/PacketSelector] create Vp9SvcSelector"); Some(Box::new(video_vp9_svc::Selector::new(false, bitrate, layers.clone(), limit))) } MediaMeta::H264 { sim: None, .. } | MediaMeta::Vp8 { sim: None, .. } | MediaMeta::Vp9 { svc: None, .. } => { log::info!("[LocalTrack/PacketSelector] create VideoSingleSelector"); - Some(Box::new(video_single::VideoSingleSelector::default())) + Some(Box::::default()) } } } diff --git a/packages/media_core/src/endpoint/internal/local_track/packet_selector/video_h264_sim.rs b/packages/media_core/src/endpoint/internal/local_track/packet_selector/video_h264_sim.rs index eca519c1..f053d3be 100644 --- a/packages/media_core/src/endpoint/internal/local_track/packet_selector/video_h264_sim.rs +++ b/packages/media_core/src/endpoint/internal/local_track/packet_selector/video_h264_sim.rs @@ -38,7 +38,7 @@ impl Selector { self.target = target; if let Some(target) = self.target { - if self.current == None || target != self.current.expect("Should have") { + if self.current.is_none() || target != self.current.expect("Should have") { log::info!("[H264SimSelector] switch to new spatial layer => request key frame"); self.queue.push_back(Action::RequestKeyFrame); } diff --git a/packages/media_core/src/endpoint/internal/local_track/packet_selector/video_vp8_sim.rs b/packages/media_core/src/endpoint/internal/local_track/packet_selector/video_vp8_sim.rs index 6d7cc0c3..4773ae6c 100644 --- a/packages/media_core/src/endpoint/internal/local_track/packet_selector/video_vp8_sim.rs +++ b/packages/media_core/src/endpoint/internal/local_track/packet_selector/video_vp8_sim.rs @@ -58,7 +58,7 @@ impl Selector { self.target = target; if let Some(target) = &self.target { - if self.current == None || target.spatial != self.current.as_ref().expect("Should have").spatial { + if self.current.is_none() || target.spatial != self.current.as_ref().expect("Should have").spatial { log::info!("[Vp8SimSelector] switch to new spatial layer => request key frame"); self.queue.push_back(Action::RequestKeyFrame); } diff --git a/packages/media_core/src/endpoint/internal/remote_track.rs b/packages/media_core/src/endpoint/internal/remote_track.rs index 4e4152dc..ddb96ced 100644 --- a/packages/media_core/src/endpoint/internal/remote_track.rs +++ b/packages/media_core/src/endpoint/internal/remote_track.rs @@ -158,7 +158,7 @@ impl EndpointRemoteTrack { } fn calc_limit_bitrate(&self) -> Option<(u64, u64)> { - let cluster_limit = self.meta.control.eq(&BitrateControlMode::DynamicConsumers).then(|| self.cluster_bitrate_limit).flatten(); + let cluster_limit = self.meta.control.eq(&BitrateControlMode::DynamicConsumers).then_some(self.cluster_bitrate_limit).flatten(); match (self.allocate_bitrate, cluster_limit) { (Some(b1), Some((min, max))) => Some((min.min(b1), max.min(b1))), (Some(b1), None) => Some((b1, b1)), diff --git a/packages/media_gateway/src/agent_service.rs b/packages/media_gateway/src/agent_service.rs index 0ab06679..c2cf768b 100644 --- a/packages/media_gateway/src/agent_service.rs +++ b/packages/media_gateway/src/agent_service.rs @@ -30,12 +30,8 @@ pub struct GatewayAgentService { _tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>, } -impl GatewayAgentService -where - SC: From + TryInto, - SE: From + TryInto, -{ - pub fn new() -> Self { +impl Default for GatewayAgentService { + fn default() -> Self { Self { output: None, seq: 0, @@ -75,7 +71,7 @@ where } .encode_to_vec(); log::info!("[GatewayAgent] broadcast ping to zone gateways"); - self.output = Some(ServiceOutput::FeatureControl(data::Control::DataSendRule(DATA_PORT, rule, meta, data.into()).into())); + self.output = Some(ServiceOutput::FeatureControl(data::Control::DataSendRule(DATA_PORT, rule, meta, data).into())); } ServiceSharedInput::Connection(_) => {} } @@ -120,8 +116,8 @@ pub struct GatewayAgentServiceBuilder { _tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>, } -impl GatewayAgentServiceBuilder { - pub fn new() -> Self { +impl Default for GatewayAgentServiceBuilder { + fn default() -> Self { Self { _tmp: std::marker::PhantomData } } } @@ -147,7 +143,7 @@ where } fn create(&self) -> Box> { - Box::new(GatewayAgentService::new()) + Box::>::default() } fn create_worker(&self) -> Box> { diff --git a/packages/media_gateway/src/store.rs b/packages/media_gateway/src/store.rs index 26f8649d..210726f9 100644 --- a/packages/media_gateway/src/store.rs +++ b/packages/media_gateway/src/store.rs @@ -86,11 +86,11 @@ impl GatewayStore { } fn node_usage(ping: &PingEvent, max_memory: u8, max_disk: u8) -> Option { - if ping.memory as u8 >= max_memory { + if ping.memory >= max_memory { return None; } - if ping.disk as u8 >= max_disk { + if ping.disk >= max_disk { return None; } @@ -98,14 +98,14 @@ fn node_usage(ping: &PingEvent, max_memory: u8, max_disk: u8) -> Option { } fn webrtc_usage(ping: &PingEvent, max_memory: u8, max_disk: u8) -> Option { - if ping.memory as u8 >= max_memory { + if ping.memory >= max_memory { return None; } - if ping.disk as u8 >= max_disk { + if ping.disk >= max_disk { return None; } let webrtc = ping.webrtc.as_ref()?; - webrtc.active.then(|| (ping.cpu as u8).max(((webrtc.live * 100) / webrtc.max) as u8)) + webrtc.active.then(|| ping.cpu.max(((webrtc.live * 100) / webrtc.max) as u8)) } diff --git a/packages/media_gateway/src/store/service.rs b/packages/media_gateway/src/store/service.rs index 561e48d7..b6c6c47b 100644 --- a/packages/media_gateway/src/store/service.rs +++ b/packages/media_gateway/src/store/service.rs @@ -12,7 +12,7 @@ struct NodeSource { last_updated: u64, } -/// This is for other cluser +/// This is for other cluster struct ZoneSource { zone: u32, usage: u8, @@ -70,6 +70,7 @@ impl ServiceStore { } } + #[allow(clippy::too_many_arguments)] pub fn on_gateway_ping(&mut self, now: u64, zone: u32, gateway: u32, gateway_usage: u8, location: Location, usage: u8, stats: ServiceStats) { if let Some(z) = self.zone_sources.iter_mut().find(|s| s.zone == zone) { z.usage = usage; @@ -116,8 +117,8 @@ impl ServiceStore { } pub fn remove_gateway(&mut self, zone: u32, gateway: u32) { - if let Some((index, z)) = self.zone_sources.iter_mut().enumerate().find(|(i, z)| z.zone == zone) { - if let Some((g_index, g)) = z.gateways.iter_mut().enumerate().find(|(i, g)| g.node == gateway) { + if let Some((index, z)) = self.zone_sources.iter_mut().enumerate().find(|(_i, z)| z.zone == zone) { + if let Some((g_index, _g)) = z.gateways.iter_mut().enumerate().find(|(_i, g)| g.node == gateway) { let g = z.gateways.remove(g_index); log::info!( "[ServiceStore {:?}] zone {zone} at {:?} remove gateway {} gateway usage {}, stats {:?}", @@ -185,7 +186,7 @@ impl Ord for NodeSource { impl PartialOrd for NodeSource { fn partial_cmp(&self, other: &Self) -> Option { - self.usage.partial_cmp(&other.usage) + Some(self.usage.cmp(&other.usage)) } } @@ -204,7 +205,7 @@ impl Ord for ZoneSource { impl PartialOrd for ZoneSource { fn partial_cmp(&self, other: &Self) -> Option { - self.usage.partial_cmp(&other.usage) + Some(self.usage.cmp(&other.usage)) } } diff --git a/packages/media_gateway/src/store_service.rs b/packages/media_gateway/src/store_service.rs index e180e7ad..d1ad9176 100644 --- a/packages/media_gateway/src/store_service.rs +++ b/packages/media_gateway/src/store_service.rs @@ -108,8 +108,7 @@ where })), } .encode_to_vec(); - self.queue - .push_back(ServiceOutput::FeatureControl(data::Control::DataSendRule(DATA_PORT, rule, meta, data.into()).into())); + self.queue.push_back(ServiceOutput::FeatureControl(data::Control::DataSendRule(DATA_PORT, rule, meta, data).into())); } } ServiceSharedInput::Connection(_) => {} diff --git a/packages/media_runner/src/worker.rs b/packages/media_runner/src/worker.rs index 849a9337..6c1c5487 100644 --- a/packages/media_runner/src/worker.rs +++ b/packages/media_runner/src/worker.rs @@ -85,6 +85,7 @@ enum MediaClusterOwner { Webrtc(WebrtcOwner), } +#[allow(clippy::type_complexity)] pub struct MediaServerWorker { sdn_slot: usize, sdn_worker: TaskSwitcherBranch, SdnWorkerOutput>, @@ -97,6 +98,7 @@ pub struct MediaServerWorker { } impl MediaServerWorker { + #[allow(clippy::too_many_arguments)] pub fn new(node_id: u32, session: u64, secret: &str, controller: bool, sdn_udp: u16, sdn_custom_addrs: Vec, sdn_zone: u32, media: MediaConfig) -> Self { let secure = media.secure.clone(); //TODO why need this? let sdn_udp_addr = SocketAddr::from(([0, 0, 0, 0], sdn_udp)); @@ -105,16 +107,16 @@ impl MediaServerWorker { let visualization = Arc::new(visualization::VisualizationServiceBuilder::new(false)); let discovery = Arc::new(manual_discovery::ManualDiscoveryServiceBuilder::new(node_addr, vec![], vec![generate_gateway_zone_tag(sdn_zone)])); - let gateway = Arc::new(GatewayAgentServiceBuilder::new()); + let gateway = Arc::new(GatewayAgentServiceBuilder::default()); let sdn_config = SdnConfig { node_id, controller: if controller { Some(ControllerPlaneCfg { session, - authorization: Arc::new(StaticKeyAuthorization::new(&secret)), + authorization: Arc::new(StaticKeyAuthorization::new(secret)), handshake_builder: Arc::new(HandshakeBuilderXDA), - random: Box::new(OsRng::default()), + random: Box::new(OsRng), services: vec![visualization.clone(), discovery.clone(), gateway.clone()], }) } else { @@ -353,7 +355,7 @@ impl MediaServerWorker { Ok((ice_lite, sdp, conn_id)) => self.queue.push_back(Output::ExtRpc( req_id, RpcRes::Webrtc(webrtc::RpcRes::Connect(Ok(( - conn_id.into(), + conn_id, ConnectResponse { conn_id: "".to_string(), sdp, diff --git a/packages/media_secure/src/jwt.rs b/packages/media_secure/src/jwt.rs index 999b34d6..e44d7e59 100644 --- a/packages/media_secure/src/jwt.rs +++ b/packages/media_secure/src/jwt.rs @@ -2,8 +2,6 @@ use crate::{MediaEdgeSecure, MediaGatewaySecure}; use jwt_simple::prelude::*; use serde::{de::DeserializeOwned, Serialize}; -const HEADER_CONN: &'static str = "C"; - pub struct MediaEdgeSecureJwt { key: HS256Key, } @@ -16,8 +14,10 @@ impl From<&[u8]> for MediaEdgeSecureJwt { impl MediaEdgeSecure for MediaEdgeSecureJwt { fn decode_obj(&self, _type: &'static str, token: &str) -> Option { - let mut options = VerificationOptions::default(); - options.allowed_issuers = Some(HashSet::from_strings(&[_type])); + let options = VerificationOptions { + allowed_issuers: Some(HashSet::from_strings(&[_type])), + ..Default::default() + }; let claims = self.key.verify_token::(token, Some(options)).ok()?; Some(claims.custom) } @@ -28,8 +28,10 @@ impl MediaEdgeSecure for MediaEdgeSecureJwt { } fn decode_conn_id(&self, token: &str) -> Option { - let mut options = VerificationOptions::default(); - options.allowed_issuers = Some(HashSet::from_strings(&["conn"])); + let options = VerificationOptions { + allowed_issuers: Some(HashSet::from_strings(&["conn"])), + ..Default::default() + }; let claims = self.key.verify_token::(token, Some(options)).ok()?; Some(claims.custom) } diff --git a/packages/media_utils/src/f16.rs b/packages/media_utils/src/f16.rs index ade7a93e..6a51f9c7 100644 --- a/packages/media_utils/src/f16.rs +++ b/packages/media_utils/src/f16.rs @@ -3,9 +3,9 @@ #[derive(Debug, PartialEq, Eq)] pub struct F16u(u16); -impl Into for F16u { - fn into(self) -> f32 { - self.0 as f32 / 100.0 +impl From for f32 { + fn from(val: F16u) -> Self { + val.0 as f32 / 100.0 } } @@ -17,7 +17,7 @@ impl F16u { impl PartialOrd for F16u { fn partial_cmp(&self, other: &Self) -> Option { - self.0.partial_cmp(&other.0) + Some(self.0.cmp(&other.0)) } } @@ -30,9 +30,9 @@ impl Ord for F16u { #[derive(Debug, PartialEq, Eq)] pub struct F16i(i16); -impl Into for F16i { - fn into(self) -> f32 { - self.0 as f32 / 100.0 +impl From for f32 { + fn from(val: F16i) -> Self { + val.0 as f32 / 100.0 } } @@ -44,6 +44,6 @@ impl Ord for F16i { impl PartialOrd for F16i { fn partial_cmp(&self, other: &Self) -> Option { - self.0.partial_cmp(&other.0) + Some(self.0.cmp(&other.0)) } } diff --git a/packages/media_utils/src/small_2dmap.rs b/packages/media_utils/src/small_2dmap.rs index ef3d276b..5616561b 100644 --- a/packages/media_utils/src/small_2dmap.rs +++ b/packages/media_utils/src/small_2dmap.rs @@ -55,4 +55,8 @@ impl Small2dMap { pub fn len(&self) -> usize { self.data.len() } + + pub fn is_empty(&self) -> bool { + self.data.is_empty() + } } diff --git a/packages/media_utils/src/ts_rewrite.rs b/packages/media_utils/src/ts_rewrite.rs index a50daefd..3f485d74 100644 --- a/packages/media_utils/src/ts_rewrite.rs +++ b/packages/media_utils/src/ts_rewrite.rs @@ -37,7 +37,7 @@ impl TsRewrite { let now_ts = now_ms as i64 * (self.data_rate as i64 / 1000); self.last_rtp_ts = rtp_ts; - self.delta_ts = now_ts as i64 - rtp_ts as i64; + self.delta_ts = now_ts - rtp_ts as i64; self.state = State::Rewriting; } State::Reinit => { diff --git a/packages/protocol/src/endpoint.rs b/packages/protocol/src/endpoint.rs index 9d9dffe6..5e76f875 100644 --- a/packages/protocol/src/endpoint.rs +++ b/packages/protocol/src/endpoint.rs @@ -19,7 +19,7 @@ impl FromStr for ClusterConnId { type Err = &'static str; fn from_str(s: &str) -> Result { let parts = s.split('-').collect::>(); - let node = parts.get(0).ok_or("MISSING NODE_ID")?.parse::().map_err(|_| "PARSE ERROR NODE_ID")?; + let node = parts.first().ok_or("MISSING NODE_ID")?.parse::().map_err(|_| "PARSE ERROR NODE_ID")?; let node_session = parts.get(1).ok_or("MISSING NODE_SESSION")?.parse::().map_err(|_| "PARSE ERROR NODE_SESSION")?; let server_conn = parts.get(2).ok_or("MISSING SERVER_CONN")?.parse::().map_err(|_| "PARSE ERROR SERVER_CONN")?; Ok(Self { node, node_session, server_conn }) @@ -61,7 +61,7 @@ impl FromStr for ServerConnId { type Err = &'static str; fn from_str(s: &str) -> Result { let parts = s.split(',').collect::>(); - let worker = parts.get(0).ok_or("MISSING WORKER")?.parse::().map_err(|_| "PARSE ERROR WORKER")?; + let worker = parts.first().ok_or("MISSING WORKER")?.parse::().map_err(|_| "PARSE ERROR WORKER")?; let index = parts.get(1).ok_or("MISSING INDEX")?.parse::().map_err(|_| "PARSE ERROR INDEX")?; Ok(Self { worker, index }) } @@ -110,9 +110,7 @@ impl ConnLayer for usize { ServerConnId { index: self, worker: param } } - fn get_down_part(&self) -> Self::DownRes { - () - } + fn get_down_part(&self) -> Self::DownRes {} } #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/packages/protocol/src/media.rs b/packages/protocol/src/media.rs index 358b5bd7..a68a4de7 100644 --- a/packages/protocol/src/media.rs +++ b/packages/protocol/src/media.rs @@ -81,7 +81,7 @@ impl MediaLayerBitrate { return 2; } - return 3; + 3 } } @@ -120,7 +120,7 @@ impl MediaLayersBitrate { return 2; } - return 3; + 3 } pub fn number_temporals(&self) -> u8 { diff --git a/packages/protocol/src/transport.rs b/packages/protocol/src/transport.rs index 78782fd3..447a7f8e 100644 --- a/packages/protocol/src/transport.rs +++ b/packages/protocol/src/transport.rs @@ -96,12 +96,9 @@ impl RpcError { } } -impl Into for RpcError { - fn into(self) -> protobuf::shared::Error { - protobuf::shared::Error { - code: self.code, - message: self.message, - } +impl From for protobuf::shared::Error { + fn from(val: RpcError) -> Self { + protobuf::shared::Error { code: val.code, message: val.message } } } diff --git a/packages/protocol/src/transport/whep.rs b/packages/protocol/src/transport/whep.rs index 6b9ec573..00f1328c 100644 --- a/packages/protocol/src/transport/whep.rs +++ b/packages/protocol/src/transport/whep.rs @@ -63,7 +63,7 @@ impl RpcReq { pub fn get_down_part(&self) -> Option { match self { - RpcReq::Connect(req) => None, + RpcReq::Connect(_req) => None, RpcReq::RemoteIce(req) => Some(req.conn_id.get_down_part()), RpcReq::Delete(req) => Some(req.conn_id.get_down_part()), } @@ -104,14 +104,14 @@ impl TryFrom for WhepConnectReq { } } -impl Into for WhepConnectReq { - fn into(self) -> protobuf::cluster_gateway::WhepConnectRequest { +impl From for protobuf::cluster_gateway::WhepConnectRequest { + fn from(val: WhepConnectReq) -> Self { protobuf::cluster_gateway::WhepConnectRequest { - user_agent: self.user_agent, - ip: self.ip.to_string(), - sdp: self.sdp, - room: self.room.0, - peer: self.peer.0, + user_agent: val.user_agent, + ip: val.ip.to_string(), + sdp: val.sdp, + room: val.room.0, + peer: val.peer.0, } } } diff --git a/packages/protocol/src/transport/whip.rs b/packages/protocol/src/transport/whip.rs index 8158efff..1953f28b 100644 --- a/packages/protocol/src/transport/whip.rs +++ b/packages/protocol/src/transport/whip.rs @@ -63,7 +63,7 @@ impl RpcReq { pub fn get_down_part(&self) -> Option { match self { - RpcReq::Connect(req) => None, + RpcReq::Connect(_req) => None, RpcReq::RemoteIce(req) => Some(req.conn_id.get_down_part()), RpcReq::Delete(req) => Some(req.conn_id.get_down_part()), } @@ -104,14 +104,14 @@ impl TryFrom for WhipConnectReq { } } -impl Into for WhipConnectReq { - fn into(self) -> protobuf::cluster_gateway::WhipConnectRequest { +impl From for protobuf::cluster_gateway::WhipConnectRequest { + fn from(val: WhipConnectReq) -> Self { protobuf::cluster_gateway::WhipConnectRequest { - user_agent: self.user_agent, - ip: self.ip.to_string(), - sdp: self.sdp, - room: self.room.0, - peer: self.peer.0, + user_agent: val.user_agent, + ip: val.ip.to_string(), + sdp: val.sdp, + room: val.room.0, + peer: val.peer.0, } } } diff --git a/packages/transport_webrtc/src/media/mod.rs b/packages/transport_webrtc/src/media/mod.rs index d3ec65a3..faabd0a9 100644 --- a/packages/transport_webrtc/src/media/mod.rs +++ b/packages/transport_webrtc/src/media/mod.rs @@ -59,17 +59,17 @@ impl RemoteMediaConvert { }, ), MediaCodec::H264(profile) => { - let layers = rtp.header.ext_vals.user_values.get::().map(extract_simulcast).flatten(); + let layers = rtp.header.ext_vals.user_values.get::().and_then(extract_simulcast); let meta = h264::parse_rtp(&rtp.payload, profile, spatial)?; (true, layers, meta) } MediaCodec::Vp8 => { - let layers = rtp.header.ext_vals.user_values.get::().map(extract_simulcast).flatten(); + let layers = rtp.header.ext_vals.user_values.get::().and_then(extract_simulcast); let meta = vp8::parse_rtp(&rtp.payload, spatial)?; (true, layers, meta) } MediaCodec::Vp9(profile) => { - let layers = rtp.header.ext_vals.user_values.get::().map(extract_svc).flatten(); + let layers = rtp.header.ext_vals.user_values.get::().and_then(extract_svc); let meta = vp9::parse_rtp(&rtp.payload, profile)?; (true, layers, meta) } diff --git a/packages/transport_webrtc/src/media/vp8.rs b/packages/transport_webrtc/src/media/vp8.rs index a66fc321..9d43a29e 100644 --- a/packages/transport_webrtc/src/media/vp8.rs +++ b/packages/transport_webrtc/src/media/vp8.rs @@ -13,12 +13,13 @@ pub fn parse_rtp(packet: &[u8], rid: Option) -> Option { let mut b = reader.get_u8(); payload_index += 1; - let mut vp8 = Vp8Header::default(); - - vp8.x = (b & 0x80) >> 7; - vp8.n = (b & 0x20) >> 5; - vp8.s = (b & 0x10) >> 4; - vp8.pid = b & 0x07; + let mut vp8 = Vp8Header { + x: (b & 0x80) >> 7, + n: (b & 0x20) >> 5, + s: (b & 0x10) >> 4, + pid: b & 0x07, + ..Default::default() + }; if vp8.x == 1 { b = reader.get_u8(); @@ -155,6 +156,7 @@ struct Vp8Header { /// extended controlbits present pub x: u8, /// when set to 1 this frame can be discarded + #[allow(unused)] pub n: u8, /// start of VP8 partition pub s: u8, diff --git a/packages/transport_webrtc/src/transport.rs b/packages/transport_webrtc/src/transport.rs index bb67e018..a5344632 100644 --- a/packages/transport_webrtc/src/transport.rs +++ b/packages/transport_webrtc/src/transport.rs @@ -292,7 +292,7 @@ impl Transport for TransportWebrtc self.rtc.add_remote_candidate(candidate); } } - self.queue.push_back(TransportOutput::Ext(ExtOut::RemoteIce(req_id, variant, Ok(success_count))).into()); + self.queue.push_back(TransportOutput::Ext(ExtOut::RemoteIce(req_id, variant, Ok(success_count)))); } ExtIn::RestartIce(req_id, variant, _ip, _useragent, req) => { if let Ok(offer) = SdpOffer::from_sdp_string(&req.sdp) { diff --git a/packages/transport_webrtc/src/transport/webrtc.rs b/packages/transport_webrtc/src/transport/webrtc.rs index 91087e62..88ad2efa 100644 --- a/packages/transport_webrtc/src/transport/webrtc.rs +++ b/packages/transport_webrtc/src/transport/webrtc.rs @@ -31,7 +31,7 @@ use prost::Message; use sans_io_runtime::{collections::DynamicDeque, return_if_err, return_if_none}; use str0m::{ bwe::BweKind, - channel::{ChannelData, ChannelId}, + channel::ChannelId, format::CodecConfig, media::{Direction, KeyframeRequestKind, MediaAdded, Mid}, Event as Str0mEvent, IceConnectionState, @@ -435,15 +435,14 @@ impl TransportWebrtcSdk { match state { IceConnectionState::New => {} IceConnectionState::Checking => {} - IceConnectionState::Connected | IceConnectionState::Completed => match &self.state { - State::Reconnecting { at } => { + IceConnectionState::Connected | IceConnectionState::Completed => { + if let State::Reconnecting { at } = &self.state { log::info!("[TransportWebrtcSdk] switched to reconnected after {:?}", now - *at); self.state = State::Connected; self.queue .push_back(InternalOutput::TransportOutput(TransportOutput::Event(TransportEvent::State(TransportState::Connected)))) } - _ => {} - }, + } IceConnectionState::Disconnected => { if matches!(self.state, State::Connected) { self.state = State::Reconnecting { at: now }; @@ -512,10 +511,10 @@ impl TransportWebrtcSdk { Some(receiver_req) => self.on_recever_req(req.req_id, &receiver.name, receiver_req), None => self.send_rpc_res_err(req.req_id, RpcError::new2(WebrtcError::RpcInvalidRequest)), }, - Some(protobuf::conn::request::Request::Room(room)) => { + Some(protobuf::conn::request::Request::Room(_room)) => { todo!() } - Some(protobuf::conn::request::Request::Features(features)) => { + Some(protobuf::conn::request::Request::Features(_features)) => { todo!() } None => self.send_rpc_res_err(req.req_id, RpcError::new2(WebrtcError::RpcInvalidRequest)), @@ -576,7 +575,7 @@ impl TransportWebrtcSdk { } fn on_sender_req(&mut self, req_id: u32, name: &str, req: protobuf::conn::request::sender::Request) { - let track = if let Some(track) = self.remote_track_by_name(&name) { + let track = if let Some(track) = self.remote_track_by_name(name) { track } else { log::warn!("[TransportWebrtcSdk] request from unknown sender {}", name); @@ -628,7 +627,7 @@ impl TransportWebrtcSdk { } fn on_recever_req(&mut self, req_id: u32, name: &str, req: protobuf::conn::request::receiver::Request) { - let track = if let Some(track) = self.local_track_by_name(&name) { + let track = if let Some(track) = self.local_track_by_name(name) { track } else { log::warn!("[TransportWebrtcSdk] request from unknown receiver {}", name); diff --git a/packages/transport_webrtc/src/transport/webrtc/local_track.rs b/packages/transport_webrtc/src/transport/webrtc/local_track.rs index 68d4b480..a1b93b7b 100644 --- a/packages/transport_webrtc/src/transport/webrtc/local_track.rs +++ b/packages/transport_webrtc/src/transport/webrtc/local_track.rs @@ -25,7 +25,7 @@ impl LocalTrack { } pub fn name(&self) -> &str { - &self.name.as_ref() + self.name.as_ref() } pub fn kind(&self) -> MediaKind { diff --git a/packages/transport_webrtc/src/transport/webrtc/remote_track.rs b/packages/transport_webrtc/src/transport/webrtc/remote_track.rs index a46c27dc..2d919318 100644 --- a/packages/transport_webrtc/src/transport/webrtc/remote_track.rs +++ b/packages/transport_webrtc/src/transport/webrtc/remote_track.rs @@ -39,7 +39,7 @@ impl RemoteTrack { } pub fn name(&self) -> &str { - &self.name.as_ref() + self.name.as_ref() } pub fn priority(&self) -> TrackPriority { @@ -80,7 +80,7 @@ impl RemoteTrack { kind: self.kind(), scaling: self.scaling, control: self.config.bitrate().into(), - metadata: self.source.as_ref().map(|s| s.metadata.clone()).flatten(), + metadata: self.source.as_ref().and_then(|s| s.metadata.clone()), } } diff --git a/packages/transport_webrtc/src/transport/whep.rs b/packages/transport_webrtc/src/transport/whep.rs index 2d7ed461..79b21427 100644 --- a/packages/transport_webrtc/src/transport/whep.rs +++ b/packages/transport_webrtc/src/transport/whep.rs @@ -224,15 +224,14 @@ impl TransportWebrtcWhep { match state { IceConnectionState::New => {} IceConnectionState::Checking => {} - IceConnectionState::Connected | IceConnectionState::Completed => match &self.state { - State::Reconnecting { at } => { + IceConnectionState::Connected | IceConnectionState::Completed => { + if let State::Reconnecting { at } = &self.state { log::info!("[TransportWebrtcWhep] switched to reconnected after {:?}", now - *at); self.state = State::Connected; self.queue .push_back(InternalOutput::TransportOutput(TransportOutput::Event(TransportEvent::State(TransportState::Connected)))) } - _ => {} - }, + } IceConnectionState::Disconnected => { if matches!(self.state, State::Connected) { self.state = State::Reconnecting { at: now }; @@ -324,7 +323,6 @@ impl TransportWebrtcWhep { ), ), ))); - return; } } } diff --git a/packages/transport_webrtc/src/transport/whip.rs b/packages/transport_webrtc/src/transport/whip.rs index e23eb161..ce761586 100644 --- a/packages/transport_webrtc/src/transport/whip.rs +++ b/packages/transport_webrtc/src/transport/whip.rs @@ -207,15 +207,14 @@ impl TransportWebrtcWhip { match state { IceConnectionState::New => {} IceConnectionState::Checking => {} - IceConnectionState::Connected | IceConnectionState::Completed => match &self.state { - State::Reconnecting { at } => { + IceConnectionState::Connected | IceConnectionState::Completed => { + if let State::Reconnecting { at } = &self.state { log::info!("[TransportWebrtcWhip] switched to reconnected after {:?}", now - *at); self.state = State::Connected; self.queue .push_back(InternalOutput::TransportOutput(TransportOutput::Event(TransportEvent::State(TransportState::Connected)))); } - _ => {} - }, + } IceConnectionState::Disconnected => { if matches!(self.state, State::Connected) { self.state = State::Reconnecting { at: now }; diff --git a/packages/transport_webrtc/src/worker.rs b/packages/transport_webrtc/src/worker.rs index eff88b92..9adcb791 100644 --- a/packages/transport_webrtc/src/worker.rs +++ b/packages/transport_webrtc/src/worker.rs @@ -36,6 +36,7 @@ pub enum GroupOutput { Continue, } +#[allow(clippy::type_complexity)] pub struct MediaWorkerWebrtc { ice_lite: bool, shared_port: SharedUdpPort,