Skip to content

Commit

Permalink
fix: typos and clippy warns (#296)
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed May 28, 2024
1 parent 5e666d2 commit 465e4a0
Show file tree
Hide file tree
Showing 51 changed files with 213 additions and 221 deletions.
1 change: 1 addition & 0 deletions bin/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl<Req, Res> Rpc<Req, Res> {
(Self { req, answer_tx }, answer_rx)
}

#[allow(unused)]
pub fn res(self, res: Res) {
let _ = self.answer_tx.send(res);
}
Expand Down
18 changes: 6 additions & 12 deletions bin/src/http/utils/payload_sdp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ impl<T: Send> Payload for ApplicationSdp<T> {
const CONTENT_TYPE: &'static str = "application/sdp";

fn check_content_type(content_type: &str) -> bool {
match content_type {
"application/sdp" => true,
_ => false,
}
content_type.eq(Self::CONTENT_TYPE)
}

fn schema_ref() -> MetaSchemaRef {
Expand All @@ -49,8 +46,8 @@ impl<T: Send> Payload for ApplicationSdp<T> {
impl ParsePayload for ApplicationSdp<String> {
const IS_REQUIRED: bool = true;

fn from_request(request: &Request, body: &mut RequestBody) -> impl std::future::Future<Output = Result<Self>> + Send {
async move { Ok(Self(String::from_request(request, body).await?)) }
async fn from_request(request: &Request, body: &mut RequestBody) -> Result<Self> {
Ok(Self(String::from_request(request, body).await?))
}
}

Expand Down Expand Up @@ -100,10 +97,7 @@ impl<T: Send> Payload for ApplicationSdpPatch<T> {
const CONTENT_TYPE: &'static str = "application/trickle-ice-sdpfrag";

fn check_content_type(content_type: &str) -> bool {
match content_type {
"application/trickle-ice-sdpfrag" => true,
_ => false,
}
content_type.eq(Self::CONTENT_TYPE)
}

fn schema_ref() -> MetaSchemaRef {
Expand All @@ -114,8 +108,8 @@ impl<T: Send> Payload for ApplicationSdpPatch<T> {
impl ParsePayload for ApplicationSdpPatch<String> {
const IS_REQUIRED: bool = true;

fn from_request(request: &Request, body: &mut RequestBody) -> impl std::future::Future<Output = Result<Self>> + Send {
async move { Ok(Self(String::from_request(request, body).await?)) }
async fn from_request(request: &Request, body: &mut RequestBody) -> Result<Self> {
Ok(Self(String::from_request(request, body).await?))
}
}

Expand Down
32 changes: 13 additions & 19 deletions bin/src/http/utils/remote_ip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,19 @@ use poem::{http::StatusCode, FromRequest};
pub struct RemoteIpAddr(pub IpAddr);

impl<'a> FromRequest<'a> for RemoteIpAddr {
fn from_request(req: &'a poem::Request, _body: &mut poem::RequestBody) -> impl std::future::Future<Output = poem::Result<Self>> + Send {
async move {
let headers = req.headers();
if let Some(remote_addr) = headers.get("X-Forwarded-For") {
let remote_addr = remote_addr.to_str().map_err(|_| poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST))?;
let remote_addr = remote_addr.split(',').next().ok_or(poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST))?;
return Ok(RemoteIpAddr(remote_addr.parse().map_err(|_| poem::Error::from_string("Invalid IP address", StatusCode::BAD_REQUEST))?));
} else if let Some(remote_addr) = headers.get("X-Real-IP") {
let remote_addr = remote_addr.to_str().map_err(|_| poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST))?;
return Ok(RemoteIpAddr(remote_addr.parse().map_err(|_| poem::Error::from_string("Invalid IP address", StatusCode::BAD_REQUEST))?));
} else {
match req.remote_addr().deref() {
poem::Addr::SocketAddr(addr) => {
return Ok(RemoteIpAddr(addr.ip()));
}
_ => {
return Err(poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST));
}
}
async fn from_request(req: &'a poem::Request, _body: &mut poem::RequestBody) -> poem::Result<Self> {
let headers = req.headers();
if let Some(remote_addr) = headers.get("X-Forwarded-For") {
let remote_addr = remote_addr.to_str().map_err(|_| poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST))?;
let remote_addr = remote_addr.split(',').next().ok_or(poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST))?;
Ok(RemoteIpAddr(remote_addr.parse().map_err(|_| poem::Error::from_string("Invalid IP address", StatusCode::BAD_REQUEST))?))
} else if let Some(remote_addr) = headers.get("X-Real-IP") {
let remote_addr = remote_addr.to_str().map_err(|_| poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST))?;
Ok(RemoteIpAddr(remote_addr.parse().map_err(|_| poem::Error::from_string("Invalid IP address", StatusCode::BAD_REQUEST))?))
} else {
match req.remote_addr().deref() {
poem::Addr::SocketAddr(addr) => Ok(RemoteIpAddr(addr.ip())),
_ => Err(poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST)),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion bin/src/http/utils/token.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use poem_openapi::{auth::Bearer, OpenApi, SecurityScheme};
use poem_openapi::{auth::Bearer, SecurityScheme};

#[derive(SecurityScheme)]
#[oai(rename = "Token Authorization", ty = "bearer", key_in = "header", key_name = "Authorization")]
Expand Down
12 changes: 5 additions & 7 deletions bin/src/http/utils/user_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ use poem::{http::StatusCode, FromRequest};
pub struct UserAgent(pub String);

impl<'a> FromRequest<'a> for UserAgent {
fn from_request(req: &'a poem::Request, _body: &mut poem::RequestBody) -> impl std::future::Future<Output = poem::Result<Self>> + Send {
async move {
let headers = req.headers();
let user_agent = headers.get("User-Agent").ok_or(poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST))?;
let user_agent = user_agent.to_str().map_err(|_| poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST))?;
Ok(UserAgent(user_agent.into()))
}
async fn from_request(req: &'a poem::Request, _body: &mut poem::RequestBody) -> poem::Result<Self> {
let headers = req.headers();
let user_agent = headers.get("User-Agent").ok_or(poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST))?;
let user_agent = user_agent.to_str().map_err(|_| poem::Error::from_string("Bad Request", StatusCode::BAD_REQUEST))?;
Ok(UserAgent(user_agent.into()))
}
}
6 changes: 5 additions & 1 deletion bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ async fn main() {
#[cfg(feature = "media")]
server::ServerType::Media(args) => server::run_media_server(workers, http_port, node, args).await,
#[cfg(feature = "cert_utils")]
server::ServerType::Cert(args) => server::run_cert_utils(args).await,
server::ServerType::Cert(args) => {
if let Err(e) = server::run_cert_utils(args).await {
log::error!("create cert error {:?}", e);
}
}
}
})
.await;
Expand Down
16 changes: 8 additions & 8 deletions bin/src/quinn/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub fn make_quinn_server(socket: VirtualUdpSocket, priv_key: PrivatePkcs8KeyDer<
pub fn make_quinn_client(socket: VirtualUdpSocket, server_certs: &[CertificateDer]) -> Result<Endpoint, Box<dyn Error>> {
let runtime = Arc::new(TokioRuntime);
let mut config = EndpointConfig::default();
//Note that client mtu size shoud be smaller than server's
//Note that client mtu size should be smaller than server's
config.max_udp_payload_size(1400).expect("Should config quinn client max_size to 1400");
let mut endpoint = Endpoint::new_with_abstract_socket(config, None, Arc::new(socket), runtime)?;
endpoint.set_default_client_config(configure_client(server_certs)?);
Expand Down Expand Up @@ -70,11 +70,11 @@ impl SkipServerVerification {
}

impl ServerCertVerifier for SkipServerVerification {
fn verify_tls12_signature(&self, message: &[u8], cert: &CertificateDer<'_>, dss: &rustls::DigitallySignedStruct) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
fn verify_tls12_signature(&self, _message: &[u8], _cert: &CertificateDer<'_>, _dss: &rustls::DigitallySignedStruct) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}

fn verify_tls13_signature(&self, message: &[u8], cert: &CertificateDer<'_>, dss: &rustls::DigitallySignedStruct) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
fn verify_tls13_signature(&self, _message: &[u8], _cert: &CertificateDer<'_>, _dss: &rustls::DigitallySignedStruct) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}

Expand All @@ -84,11 +84,11 @@ impl ServerCertVerifier for SkipServerVerification {

fn verify_server_cert(
&self,
end_entity: &CertificateDer<'_>,
intermediates: &[CertificateDer<'_>],
server_name: &rustls::pki_types::ServerName<'_>,
ocsp_response: &[u8],
now: rustls::pki_types::UnixTime,
_end_entity: &CertificateDer<'_>,
_intermediates: &[CertificateDer<'_>],
_server_name: &rustls::pki_types::ServerName<'_>,
_ocsp_response: &[u8],
_now: rustls::pki_types::UnixTime,
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
Ok(rustls::client::danger::ServerCertVerified::assertion())
}
Expand Down
1 change: 0 additions & 1 deletion bin/src/quinn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,3 @@ mod vsocket;

pub use builder::{make_quinn_client, make_quinn_server};
pub use vnet::VirtualNetwork;
pub use vsocket::VirtualUdpSocket;
2 changes: 1 addition & 1 deletion bin/src/quinn/vsocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct Poller {}

impl UdpPoller for Poller {
fn poll_writable(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<std::io::Result<()>> {
//TODO implement this for better performace
//TODO implement this for better performance
Poll::Ready(Ok(()))
}
}
Expand Down
9 changes: 5 additions & 4 deletions bin/src/server/cert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ pub struct Args {
domains: Vec<String>,
}

pub async fn run_cert_utils(args: Args) {
let cert = rcgen::generate_simple_self_signed(args.domains).unwrap();
pub async fn run_cert_utils(args: Args) -> Result<(), Box<dyn std::error::Error>> {
let cert = rcgen::generate_simple_self_signed(args.domains)?;
let start = SystemTime::now();
let since_the_epoch = start.duration_since(UNIX_EPOCH).expect("Time went backwards").as_millis();
std::fs::write(format!("./certificate-{}.cert", since_the_epoch), cert.cert.der().to_vec()).unwrap();
std::fs::write(format!("./certificate-{}.key", since_the_epoch), cert.key_pair.serialize_der().to_vec()).unwrap();
std::fs::write(format!("./certificate-{}.cert", since_the_epoch), cert.cert.der())?;
std::fs::write(format!("./certificate-{}.key", since_the_epoch), cert.key_pair.serialize_der())?;
Ok(())
}
2 changes: 1 addition & 1 deletion bin/src/server/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ use clap::Parser;
#[derive(Debug, Parser)]
pub struct Args {}

pub async fn run_media_connector(workers: usize, args: Args) {
pub async fn run_media_connector(_workers: usize, _args: Args) {
println!("Running media connector");
}
4 changes: 2 additions & 2 deletions bin/src/server/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option<u16>, node: Nod
let (mut vnet, vnet_tx, mut vnet_rx) = VirtualNetwork::new(node.node_id);

let media_rpc_socket = vnet.udp_socket(0).await.expect("Should open virtual port for gateway rpc");
let media_rpc_client = MediaEdgeServiceClient::new(QuinnClient::new(make_quinn_client(media_rpc_socket, &vec![]).expect("Should create endpoint for media rpc client")));
let media_rpc_client = MediaEdgeServiceClient::new(QuinnClient::new(make_quinn_client(media_rpc_socket, &[]).expect("Should create endpoint for media rpc client")));

let media_rpc_socket = vnet.udp_socket(GATEWAY_RPC_PORT).await.expect("Should open virtual port for gateway rpc");
let mut media_rpc_server = MediaEdgeServiceServer::new(
Expand All @@ -123,7 +123,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option<u16>, node: Nod
media_rpc_server.run().await;
});

tokio::task::spawn_local(async move { while let Some(_) = vnet.recv().await {} });
tokio::task::spawn_local(async move { while vnet.recv().await.is_some() {} });

loop {
if controller.process().is_none() {
Expand Down
8 changes: 5 additions & 3 deletions bin/src/server/gateway/dest_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ use tokio::sync::{
oneshot,
};

type QueryRequest = (ServiceKind, Option<(f32, f32)>, oneshot::Sender<Option<u32>>);

#[derive(Clone)]
pub struct GatewayDestSelector {
tx: Sender<(ServiceKind, Option<(f32, f32)>, oneshot::Sender<Option<u32>>)>,
tx: Sender<QueryRequest>,
}

impl GatewayDestSelector {
Expand All @@ -21,7 +23,7 @@ impl GatewayDestSelector {
}

pub struct GatewayDestRequester {
rx: Receiver<(ServiceKind, Option<(f32, f32)>, oneshot::Sender<Option<u32>>)>,
rx: Receiver<QueryRequest>,
req_seed: u64,
reqs: HashMap<u64, oneshot::Sender<Option<u32>>>,
}
Expand All @@ -31,7 +33,7 @@ impl GatewayDestRequester {
match event {
media_server_gateway::store_service::Event::FindNodeRes(req_id, res) => {
if let Some(tx) = self.reqs.remove(&req_id) {
if let Err(_) = tx.send(res) {
if tx.send(res).is_err() {
log::error!("[GatewayDestRequester] answer for req_id {req_id} error");
}
}
Expand Down
6 changes: 3 additions & 3 deletions bin/src/server/gateway/rpc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct MediaRpcHandlerImpl {}
impl MediaEdgeServiceHandler<Ctx> for MediaRpcHandlerImpl {
async fn whip_connect(&self, ctx: &Ctx, req: WhipConnectRequest) -> Option<WhipConnectResponse> {
log::info!("On whip_connect from other gateway");
let location = req.ip.parse().ok().map(|ip| ctx.ip2location.get_location(&ip)).flatten();
let location = req.ip.parse().ok().and_then(|ip| ctx.ip2location.get_location(&ip));
let dest = ctx.selector.select(ServiceKind::Webrtc, location).await?;
let dest_addr = node_vnet_addr(dest, GATEWAY_RPC_PORT);
ctx.client.whip_connect(dest_addr, req).await
Expand All @@ -55,7 +55,7 @@ impl MediaEdgeServiceHandler<Ctx> for MediaRpcHandlerImpl {

async fn whep_connect(&self, ctx: &Ctx, req: WhepConnectRequest) -> Option<WhepConnectResponse> {
log::info!("On whep_connect from other gateway");
let location = req.ip.parse().ok().map(|ip| ctx.ip2location.get_location(&ip)).flatten();
let location = req.ip.parse().ok().and_then(|ip| ctx.ip2location.get_location(&ip));
let dest = ctx.selector.select(ServiceKind::Webrtc, location).await?;
let dest_addr = node_vnet_addr(dest, GATEWAY_RPC_PORT);
ctx.client.whep_connect(dest_addr, req).await
Expand All @@ -79,7 +79,7 @@ impl MediaEdgeServiceHandler<Ctx> for MediaRpcHandlerImpl {

async fn webrtc_connect(&self, ctx: &Ctx, req: WebrtcConnectRequest) -> Option<WebrtcConnectResponse> {
log::info!("On webrtc_connect from other gateway");
let location = req.ip.parse().ok().map(|ip| ctx.ip2location.get_location(&ip)).flatten();
let location = req.ip.parse().ok().and_then(|ip| ctx.ip2location.get_location(&ip));
let dest = ctx.selector.select(ServiceKind::Webrtc, location).await?;
let dest_addr = node_vnet_addr(dest, GATEWAY_RPC_PORT);
ctx.client.webrtc_connect(dest_addr, req).await
Expand Down
4 changes: 2 additions & 2 deletions bin/src/server/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: Node
media_rpc_server.run().await;
});

tokio::task::spawn_local(async move { while let Some(_) = vnet.recv().await {} });
tokio::task::spawn_local(async move { while vnet.recv().await.is_some() {} });

loop {
if controller.process().is_none() {
Expand Down Expand Up @@ -160,7 +160,7 @@ pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: Node
log::info!("on req {req_id} res from worker {worker}");
let res = res.up(worker).up((node_id, node_session));
if let Some(tx) = reqs.remove(&req_id) {
if let Err(_) = tx.send(res) {
if tx.send(res).is_err() {
log::error!("Send rpc response error for req {req_id}");
}
}
Expand Down
8 changes: 4 additions & 4 deletions bin/src/server/media/rpc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl MediaEdgeServiceHandler<Ctx> for MediaRpcHandlerImpl {
let res = rx.await.ok()?;
//TODO process with ICE restart
match res {
RpcRes::Whip(whip::RpcRes::RemoteIce(res)) => res.ok().map(|r| WhipRemoteIceResponse { conn }),
RpcRes::Whip(whip::RpcRes::RemoteIce(res)) => res.ok().map(|_r| WhipRemoteIceResponse { conn }),
_ => None,
}
}
Expand All @@ -70,7 +70,7 @@ impl MediaEdgeServiceHandler<Ctx> for MediaRpcHandlerImpl {
let res = rx.await.ok()?;
//TODO process with ICE restart
match res {
RpcRes::Whip(whip::RpcRes::Delete(res)) => res.ok().map(|r| WhipCloseResponse { conn }),
RpcRes::Whip(whip::RpcRes::Delete(res)) => res.ok().map(|_r| WhipCloseResponse { conn }),
_ => None,
}
}
Expand Down Expand Up @@ -100,7 +100,7 @@ impl MediaEdgeServiceHandler<Ctx> for MediaRpcHandlerImpl {
let res = rx.await.ok()?;
//TODO process with ICE restart
match res {
RpcRes::Whep(whep::RpcRes::RemoteIce(res)) => res.ok().map(|r| WhepRemoteIceResponse { conn }),
RpcRes::Whep(whep::RpcRes::RemoteIce(res)) => res.ok().map(|_r| WhepRemoteIceResponse { conn }),
_ => None,
}
}
Expand All @@ -114,7 +114,7 @@ impl MediaEdgeServiceHandler<Ctx> for MediaRpcHandlerImpl {
let res = rx.await.ok()?;
//TODO process with ICE restart
match res {
RpcRes::Whep(whep::RpcRes::Delete(res)) => res.ok().map(|r| WhepCloseResponse { conn }),
RpcRes::Whep(whep::RpcRes::Delete(res)) => res.ok().map(|_r| WhepCloseResponse { conn }),
_ => None,
}
}
Expand Down
2 changes: 1 addition & 1 deletion bin/src/server/media/runtime_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl<ES: 'static + MediaEdgeSecure> WorkerInner<Owner, ExtIn, ExtOut, Channel, E
cfg.node.zone,
cfg.media,
);
log::info!("creted worker");
log::info!("created worker");
MediaRuntimeWorker { index, worker, queue }
}

Expand Down
2 changes: 1 addition & 1 deletion packages/media_core/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl<Owner: Debug + Hash + Copy + Clone + Debug + Eq> MediaCluster<Owner> {
}
}

pub fn shutdown<'a>(&mut self, now: Instant) {
pub fn shutdown(&mut self, now: Instant) {
self.rooms.on_shutdown(now);
}
}
Expand Down
4 changes: 2 additions & 2 deletions packages/media_core/src/cluster/room/channel_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl<Owner: Hash + Eq + Copy + Debug> RoomChannelSubscribe<Owner> {
owner
);
self.subscribers.insert((owner, track), (channel_id, target_peer, target_track));
let channel_container = self.channels.entry(channel_id).or_insert(Default::default());
let channel_container = self.channels.entry(channel_id).or_default();
channel_container.owners.push((owner, track));
if channel_container.owners.len() == 1 {
log::info!("[ClusterRoom {}/Subscribers] first subscriber => Sub channel {channel_id}", self.room);
Expand Down Expand Up @@ -136,7 +136,7 @@ impl<Owner: Hash + Eq + Copy + Debug> RoomChannelSubscribe<Owner> {
if let Some(sum_fb) = &mut sum_fb {
*sum_fb = *sum_fb + *fb;
} else {
sum_fb = Some(fb.clone());
sum_fb = Some(*fb);
}
}
log::debug!("[ClusterRoom {}/Subscribers] channel {channel_id} setting desired bitrate {:?}", self.room, sum_fb);
Expand Down
Loading

0 comments on commit 465e4a0

Please sign in to comment.