Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix build warnings and clippy warnings #328

Merged
merged 6 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[workspace]
resolver = "2"
members = [
"bin",
"packages/protocol",
Expand All @@ -12,7 +13,7 @@ members = [
]

[workspace.dependencies]
sans-io-runtime = { git = "https://github.com/giangndm/sans-io-runtime.git", rev = "c781cef12b2a435b5e31a6ede69d301a23719452" }
sans-io-runtime = { git = "https://github.com/giangndm/sans-io-runtime.git", rev = "c781cef12b2a435b5e31a6ede69d301a23719452" , default-features = false}
atm0s-sdn = { git = "https://github.com/giangndm/8xFF-decentralized-sdn.git", rev = "2ad5d3a092b63f871a90a9600d2fcc8cb3027a24" }
tracing-subscriber = { version = "0.3", features = ["env-filter", "std"] }
convert-enum = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion bin/src/http/api_console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ struct ConsoleAuthorization(());

async fn api_checker(req: &Request, api_key: ApiKey) -> Option<()> {
let data = req.data::<ConsoleApisCtx>()?;
data.secure.validate_token(&api_key.key).then(|| ())
data.secure.validate_token(&api_key.key).then_some(())
}
1 change: 1 addition & 0 deletions bin/src/http/api_console/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ impl Apis {
}

/// get events
#[allow(clippy::too_many_arguments)]
#[oai(path = "/:node/log/events", method = "get")]
async fn events(
&self,
Expand Down
4 changes: 2 additions & 2 deletions bin/src/quinn/vnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::vsocket::VirtualUdpSocket;

#[derive(Debug)]
pub struct NetworkPkt {
pub local_port: u16,
pub _local_port: u16,
pub remote: NodeId,
pub remote_port: u16,
pub data: Buffer,
Expand Down Expand Up @@ -77,7 +77,7 @@ impl VirtualNetwork {
let event = event?;
match event {
socket::Event::RecvFrom(local_port, remote, remote_port, data, meta) => {
let pkt = NetworkPkt { data, local_port, remote, remote_port, meta };
let pkt = NetworkPkt { data, _local_port: local_port, remote, remote_port, meta };
if let Some(socket_tx) = self.sockets.get(&local_port) {
if let Err(e) = socket_tx.try_send(pkt) {
log::error!("Send to socket {} error {:?}", local_port, e);
Expand Down
2 changes: 1 addition & 1 deletion bin/src/server/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct Args {
pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) {
rustls::crypto::ring::default_provider().install_default().expect("should install ring as default");

let mut connector_storage = Arc::new(ConnectorStorage::new(&args.db_uri).await);
let connector_storage = Arc::new(ConnectorStorage::new(&args.db_uri).await);

let default_cluster_cert_buf = include_bytes!("../../certs/cluster.cert");
let default_cluster_key_buf = include_bytes!("../../certs/cluster.key");
Expand Down
8 changes: 4 additions & 4 deletions bin/src/server/console/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl Storage {
ClusterNodeInfo::Console(generic) => {
let zone_id = node & 0xFF_FF_FF_00;
log::info!("Zone {zone_id} on console ping, zones {}", self.zones.len());
let zone = self.zones.entry(zone_id).or_insert_with(Default::default);
let zone = self.zones.entry(zone_id).or_default();
zone.consoles.insert(
node,
ConsoleContainer {
Expand All @@ -165,7 +165,7 @@ impl Storage {
ClusterNodeInfo::Gateway(generic, info) => {
let zone_id = node & 0xFF_FF_FF_00;
log::info!("Zone {zone_id} on gateway ping");
let zone = self.zones.entry(zone_id).or_insert_with(Default::default);
let zone = self.zones.entry(zone_id).or_default();
zone.lat = info.lat;
zone.lon = info.lon;
zone.gateways.insert(
Expand All @@ -181,7 +181,7 @@ impl Storage {
ClusterNodeInfo::Media(generic, info) => {
let zone_id = node & 0xFF_FF_FF_00;
log::info!("Zone {zone_id} on media ping");
let zone = self.zones.entry(zone_id).or_insert_with(Default::default);
let zone = self.zones.entry(zone_id).or_default();
zone.medias.insert(
node,
MediaContainer {
Expand All @@ -195,7 +195,7 @@ impl Storage {
ClusterNodeInfo::Connector(generic) => {
let zone_id = node & 0xFF_FF_FF_00;
log::info!("Zone {zone_id} on connector ping, zones {}", self.zones.len());
let zone = self.zones.entry(zone_id).or_insert_with(Default::default);
let zone = self.zones.entry(zone_id).or_default();
zone.connectors.insert(
node,
ConnectorContainer {
Expand Down
2 changes: 1 addition & 1 deletion bin/src/server/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option<u16>, node: Nod
media_server_gateway::store_service::Event::FindNodeRes(req_id, res) => requester.on_find_node_res(req_id, res),
},
SdnExtOut::ServicesEvent(_, _, SE::Connector(event)) => match event {
media_server_connector::agent_service::Event::Stats { queue, inflight, acked } => {}
media_server_connector::agent_service::Event::Stats { queue: _, inflight: _, acked: _ } => {}
},
SdnExtOut::FeaturesEvent(_, FeaturesEvent::Socket(event)) => {
if let Err(e) = vnet_tx.try_send(event) {
Expand Down
2 changes: 1 addition & 1 deletion bin/src/server/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: Node
if let Some(metrics) = node_metrics_collector.pop_measure() {
controller.send_to(
0, //because sdn controller allway is run inside worker 0
ExtIn::NodeStats(metrics).into(),
ExtIn::NodeStats(metrics),
);
}
while let Ok(control) = vnet_rx.try_recv() {
Expand Down
5 changes: 3 additions & 2 deletions bin/src/server/media/runtime_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use media_server_gateway::NodeMetrics;
use media_server_protocol::transport::{RpcReq, RpcRes};
use media_server_runner::{Input as WorkerInput, MediaConfig, MediaServerWorker, Output as WorkerOutput, Owner, UserData, SC, SE, TC, TW};
use media_server_secure::MediaEdgeSecure;
use rand::random;
use sans_io_runtime::{BusChannelControl, BusControl, BusEvent, WorkerInner, WorkerInnerInput, WorkerInnerOutput};

use crate::NodeConfig;
Expand Down Expand Up @@ -58,7 +57,9 @@ impl<ES: 'static + MediaEdgeSecure> WorkerInner<Owner, ExtIn, ExtOut, Channel, E
let worker = MediaServerWorker::new(
index,
cfg.node.node_id,
random(),
// random(),
// NOTE: Not sure if session was supposed to e used here instead of random(); Input Required.
cfg.session,
dhilipsiva marked this conversation as resolved.
Show resolved Hide resolved
&cfg.node.secret,
cfg.controller,
cfg.node.udp_port,
Expand Down
2 changes: 1 addition & 1 deletion packages/audio_mixer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl<Src: Debug + Clone + Eq + Hash> AudioMixer<Src> {
for (i, slot) in self.outputs.iter().enumerate() {
if let Some(OutputSlotState { audio_level, source }) = slot {
if let Some((_, _, lowest_slot_audio_level)) = &mut lowest {
if *audio_level < *lowest_slot_audio_level || (*audio_level == *lowest_slot_audio_level) {
dhilipsiva marked this conversation as resolved.
Show resolved Hide resolved
if *audio_level <= *lowest_slot_audio_level {
lowest = Some((i, source.clone(), *audio_level));
}
} else {
Expand Down
12 changes: 12 additions & 0 deletions packages/media_connector/src/agent_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ pub struct ConnectorAgentService<UserData, SC, SE, TC, TW> {
_tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>,
}

impl<UserData, SC, SE, TC, TW> Default for ConnectorAgentService<UserData, SC, SE, TC, TW> {
fn default() -> Self {
Self::new()
}
}

impl<UserData, SC, SE, TC, TW> ConnectorAgentService<UserData, SC, SE, TC, TW> {
pub fn new() -> Self {
Self {
Expand Down Expand Up @@ -159,6 +165,12 @@ pub struct ConnectorAgentServiceBuilder<UserData, SC, SE, TC, TW> {
_tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>,
}

impl<UserData, SC, SE, TC, TW> Default for ConnectorAgentServiceBuilder<UserData, SC, SE, TC, TW> {
fn default() -> Self {
Self::new()
}
}

impl<UserData, SC, SE, TC, TW> ConnectorAgentServiceBuilder<UserData, SC, SE, TC, TW> {
pub fn new() -> Self {
Self { _tmp: std::marker::PhantomData }
Expand Down
12 changes: 12 additions & 0 deletions packages/media_connector/src/handler_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ pub struct ConnectorHandlerService<UserData, SC, SE, TC, TW> {
_tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>,
}

impl<UserData, SC, SE, TC, TW> Default for ConnectorHandlerService<UserData, SC, SE, TC, TW> {
fn default() -> Self {
Self::new()
}
}

impl<UserData, SC, SE, TC, TW> ConnectorHandlerService<UserData, SC, SE, TC, TW> {
pub fn new() -> Self {
Self {
Expand Down Expand Up @@ -152,6 +158,12 @@ pub struct ConnectorHandlerServiceBuilder<UserData, SC, SE, TC, TW> {
_tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>,
}

impl<UserData, SC, SE, TC, TW> Default for ConnectorHandlerServiceBuilder<UserData, SC, SE, TC, TW> {
fn default() -> Self {
Self::new()
}
}

impl<UserData, SC, SE, TC, TW> ConnectorHandlerServiceBuilder<UserData, SC, SE, TC, TW> {
pub fn new() -> Self {
Self { _tmp: std::marker::PhantomData }
Expand Down
4 changes: 2 additions & 2 deletions packages/media_connector/src/msg_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ impl<M: Message, const MAX_INFLIGHT: usize> MessageQueue<M, MAX_INFLIGHT> {

pub fn pop(&mut self, now_ms: u64) -> Option<&M> {
if let Some(msg_id) = self.pop_retry_msg_id(now_ms) {
let entry = self.inflight_ts.entry(now_ms).or_insert_with(Default::default);
let entry = self.inflight_ts.entry(now_ms).or_default();
entry.push(msg_id);
return Some(self.inflight.get(&msg_id).expect("should exist retry_msg_id"));
}

if self.inflight.len() < MAX_INFLIGHT {
let msg = self.queue.pop_front()?;
let msg_id = msg.msg_id();
let entry = self.inflight_ts.entry(now_ms).or_insert_with(Default::default);
let entry = self.inflight_ts.entry(now_ms).or_default();
entry.push(msg_id);
self.inflight.insert(msg_id, msg);
self.inflight.get(&msg_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl MigrationTrait for Migration {
enum Room {
Table,
Id,
#[allow(clippy::enum_variant_names)]
Room,
CreatedAt,
}
Expand All @@ -130,6 +131,7 @@ enum Peer {
Table,
Id,
Room,
#[allow(clippy::enum_variant_names)]
Peer,
CreatedAt,
}
Expand Down Expand Up @@ -165,6 +167,7 @@ enum Event {
NodeTs,
Session,
CreatedAt,
#[allow(clippy::enum_variant_names)]
Event,
Meta,
}
4 changes: 3 additions & 1 deletion packages/media_core/src/cluster/room/audio_mixer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub enum Output<Endpoint> {
OnResourceEmpty,
}

type AudioMixerManuals<T> = TaskSwitcherBranch<TaskGroup<manual::Input, Output<T>, ManualMixer<T>, 4>, (usize, Output<T>)>;

pub struct AudioMixer<Endpoint: Clone> {
room: ClusterRoomHash,
mix_channel_id: ChannelId,
Expand All @@ -64,7 +66,7 @@ pub struct AudioMixer<Endpoint: Clone> {
subscriber1: TaskSwitcherBranch<AudioMixerSubscriber<Endpoint, 1>, Output<Endpoint>>,
subscriber2: TaskSwitcherBranch<AudioMixerSubscriber<Endpoint, 2>, Output<Endpoint>>,
subscriber3: TaskSwitcherBranch<AudioMixerSubscriber<Endpoint, 3>, Output<Endpoint>>,
manuals: TaskSwitcherBranch<TaskGroup<manual::Input, Output<Endpoint>, ManualMixer<Endpoint>, 4>, (usize, Output<Endpoint>)>,
manuals: AudioMixerManuals<Endpoint>,
switcher: TaskSwitcher,
last_tick: Instant,
}
Expand Down
8 changes: 4 additions & 4 deletions packages/media_core/src/cluster/room/audio_mixer/manual.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ impl<Endpoint: Clone> ManualMixer<Endpoint> {

fn attach(&mut self, _now: Instant, source: TrackSource) {
let channel_id = id_generator::gen_channel_id(self.room, &source.peer, &source.track);
if !self.sources.contains_key(&channel_id) {
if let std::collections::hash_map::Entry::Vacant(e) = self.sources.entry(channel_id) {
log::info!("[ClusterManualMixer] add source {:?} => sub {channel_id}", source);
self.sources.insert(channel_id, source);
e.insert(source);
self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, pubsub::ChannelControl::SubAuto)));
}
}
Expand Down Expand Up @@ -85,7 +85,7 @@ impl<Endpoint: Clone> ManualMixer<Endpoint> {

fn detach(&mut self, _now: Instant, source: TrackSource) {
let channel_id = id_generator::gen_channel_id(self.room, &source.peer, &source.track);
if let Some(_) = self.sources.remove(&channel_id) {
if self.sources.remove(&channel_id).is_some() {
log::info!("[ClusterManualMixer] remove source {:?} => unsub {channel_id}", source);
self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, pubsub::ChannelControl::UnsubAuto)));
}
Expand Down Expand Up @@ -124,7 +124,7 @@ impl<Endpoint: Clone> Task<Input, Output<Endpoint>> for ManualMixer<Endpoint> {
Input::LeaveRoom => {
// We need manual release sources because it is from client request,
// we cannot ensure client will release it before it disconnect.
let sources = std::mem::replace(&mut self.sources, Default::default());
let sources = std::mem::take(&mut self.sources);
for (channel_id, source) in sources {
log::info!("[ClusterManualMixer] remove source {:?} on queue => unsub {channel_id}", source);
self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, pubsub::ChannelControl::UnsubAuto)));
Expand Down
4 changes: 3 additions & 1 deletion packages/media_core/src/endpoint/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ pub enum InternalOutput {
Destroy,
}

type EndpointInternalWaitJoin = Option<(EndpointReqId, RoomId, PeerId, PeerMeta, RoomInfoPublish, RoomInfoSubscribe, Option<AudioMixerConfig>)>;

pub struct EndpointInternal {
cfg: EndpointCfg,
state: Option<(Instant, TransportState)>,
wait_join: Option<(EndpointReqId, RoomId, PeerId, PeerMeta, RoomInfoPublish, RoomInfoSubscribe, Option<AudioMixerConfig>)>,
wait_join: EndpointInternalWaitJoin,
joined: Option<(ClusterRoomHash, RoomId, PeerId, Option<AudioMixerMode>)>,
local_tracks_id: Small2dMap<LocalTrackId, usize>,
remote_tracks_id: Small2dMap<RemoteTrackId, usize>,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::VecDeque;
use std::{cmp::Ordering, collections::VecDeque};

use media_server_protocol::media::{MediaLayersBitrate, MediaMeta, MediaPacket};

Expand Down Expand Up @@ -53,36 +53,42 @@ impl Selector {
if let MediaMeta::H264 { key, profile: _, sim: Some(_sim) } = &mut pkt.meta {
match (self.current, self.target) {
(Some(current), Some(target)) => {
if target < current {
//down spatial => need wait key-frame
if *key {
log::info!("[H264SimSelector] down {} => {} with key", current, target);
ctx.seq_rewrite.reinit();
ctx.ts_rewrite.reinit();
self.current = self.target;
} else {
self.queue.push_back(Action::RequestKeyFrame);
match target.cmp(&current) {
Ordering::Less => {
// down spatial => need wait key-frame
if *key {
log::info!("[H264SimSelector] down {} => {} with key", current, target);
ctx.seq_rewrite.reinit();
ctx.ts_rewrite.reinit();
self.current = self.target;
} else {
self.queue.push_back(Action::RequestKeyFrame);
}
}
} else if target > current {
//up spatial => need wait key-frame
if *key {
log::info!("[H264SimSelector] up {} => {} with key", current, target);
ctx.seq_rewrite.reinit();
ctx.ts_rewrite.reinit();
self.current = Some(target);
} else if !*key {
self.queue.push_back(Action::RequestKeyFrame);
Ordering::Greater => {
// up spatial => need wait key-frame
if *key {
log::info!("[H264SimSelector] up {} => {} with key", current, target);
ctx.seq_rewrite.reinit();
ctx.ts_rewrite.reinit();
self.current = Some(target);
} else {
self.queue.push_back(Action::RequestKeyFrame);
}
}
Ordering::Equal => {
// target is equal to current, handle if needed
}
}
}
(Some(current), None) => {
//need pause
//TODO wait current frame finished for avoiding interrupt client
// need pause
// TODO: wait current frame finished for avoiding interrupt client
log::info!("[H264SimSelector] pause from {}", current);
self.current = None;
}
(None, Some(target)) => {
//need resume or start => need wait key_frame
// need resume or start => need wait key-frame
if *key {
log::info!("[H264SimSelector] resume to {} with key", target);
// with other spatial we have difference tl0xidx and pic_id offset
Expand All @@ -91,7 +97,7 @@ impl Selector {
}
}
(None, None) => {
//reject
// reject
}
}
}
Expand Down
Loading