Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat connector server #120

Merged
merged 5 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions packages/cluster/src/define/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use transport::TrackId;

use crate::{
ClusterEndpointError, ClusterLocalTrackIncomingEvent, ClusterLocalTrackOutgoingEvent, ClusterPeerId, ClusterRemoteTrackIncomingEvent, ClusterRemoteTrackOutgoingEvent, ClusterTrackMeta,
ClusterTrackName, ClusterTrackUuid,
rpc::connector::MediaEndpointLogRequest, ClusterEndpointError, ClusterLocalTrackIncomingEvent, ClusterLocalTrackOutgoingEvent, ClusterPeerId, ClusterRemoteTrackIncomingEvent,
ClusterRemoteTrackOutgoingEvent, ClusterTrackMeta, ClusterTrackName, ClusterTrackUuid,
};

#[async_trait::async_trait]
Expand All @@ -19,6 +19,7 @@ pub enum ClusterEndpointOutgoingEvent {
UnsubscribePeer(ClusterPeerId),
LocalTrackEvent(TrackId, ClusterLocalTrackOutgoingEvent),
RemoteTrackEvent(TrackId, ClusterTrackUuid, ClusterRemoteTrackOutgoingEvent),
MediaEndpointLog(MediaEndpointLogRequest),
}

#[derive(Clone, PartialEq, Eq, Debug)]
Expand Down
3 changes: 3 additions & 0 deletions packages/cluster/src/define/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::fmt::Debug;

pub mod connector;
pub mod gateway;
pub mod general;
pub mod webrtc;
Expand Down Expand Up @@ -57,3 +58,5 @@ pub const RPC_WHEP_CONNECT: &str = "RPC_WHEP_CONNECT";

pub const RPC_NODE_PING: &str = "RPC_NODE_PING";
pub const RPC_NODE_HEALTHCHECK: &str = "RPC_NODE_HEALTHCHECK";

pub const RPC_MEDIA_ENDPOINT_LOG: &str = "RPC_MEDIA_ENDPOINT_LOG";
165 changes: 165 additions & 0 deletions packages/cluster/src/define/rpc/connector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
use std::net::SocketAddr;

use atm0s_sdn::NodeId;
use media_utils::F32;
use proc_macro::{IntoVecU8, TryFromSliceU8};
use serde::{Deserialize, Serialize};
use transport::MediaKind;

#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum MediaStreamIssueType {
Connectivity { mos: F32<2>, lost_percents: F32<2>, jitter_ms: F32<2>, rtt_ms: u32 },
}

#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum MediaEndpointEvent {
Routing {
user_agent: String,
gateway_node_id: NodeId,
},
RoutingError {
reason: String,
gateway_node_id: NodeId,
media_node_ids: Vec<NodeId>,
},
Routed {
media_node_id: NodeId,
after_ms: u32,
},
Connecting {
user_agent: String,
remote: Option<SocketAddr>,
},
ConnectError {
remote: Option<SocketAddr>,
error_code: String,
error_message: String,
},
Connected {
after_ms: u32,
remote: Option<SocketAddr>,
},
Reconnecting {
reason: String,
},
Reconnected {
remote: Option<SocketAddr>,
},
Disconnected {
error: Option<String>,
sent_bytes: u64,
received_bytes: u64,
duration_ms: u64,
rtt: F32<2>,
},
SessionStats {
received_bytes: u64,
receive_limit_bitrate: u32,
sent_bytes: u64,
send_est_bitrate: u32,
rtt: u16,
},
}

#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum MediaReceiveStreamEvent {
StreamStarted {
name: String,
kind: MediaKind,
remote_peer: String,
remote_stream: String,
},
StreamIssue {
name: String,
kind: MediaKind,
remote_peer: String,
remote_stream: String,
issue: MediaStreamIssueType,
},
StreamStats {
name: String,
kind: MediaKind,
limit_bitrate: u32,
received_bytes: u64,
freeze: bool,
mos: Option<F32<2>>,
rtt: Option<u32>,
jitter: Option<F32<2>>,
lost: Option<F32<2>>,
},
StreamEnded {
name: String,
kind: MediaKind,
sent_bytes: u64,
freeze_count: u32,
duration_ms: u64,
mos: Option<(F32<2>, F32<2>, F32<2>)>,
rtt: Option<(F32<2>, F32<2>, F32<2>)>,
jitter: Option<(F32<2>, F32<2>, F32<2>)>,
lost: Option<(F32<2>, F32<2>, F32<2>)>,
},
}

#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum MediaSendStreamEvent {
StreamStarted {
name: String,
kind: MediaKind,
meta: String,
scaling: String,
},
StreamIssue {
name: String,
kind: MediaKind,
issue: MediaStreamIssueType,
},
StreamStats {
name: String,
kind: MediaKind,
sent_bytes: u64,
freeze: bool,
mos: Option<F32<2>>,
rtt: Option<u32>,
jitter: Option<F32<2>>,
lost: Option<F32<2>>,
},
StreamEnded {
name: String,
kind: MediaKind,
received_bytes: u64,
duration_ms: u64,
freeze_count: u32,
mos: Option<(F32<2>, F32<2>, F32<2>)>,
rtt: Option<(F32<2>, F32<2>, F32<2>)>,
jitter: Option<(F32<2>, F32<2>, F32<2>)>,
lost: Option<(F32<2>, F32<2>, F32<2>)>,
},
}

#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, IntoVecU8, TryFromSliceU8)]
pub enum MediaEndpointLogRequest {
SessionEvent {
ip: String,
version: Option<String>,
location: Option<(F32<2>, F32<2>)>,
token: Vec<u8>,
ts: u64,
session_uuid: u64,
event: MediaEndpointEvent,
},
ReceiveStreamEvent {
token: Vec<u8>,
ts: u64,
session_uuid: u64,
event: MediaReceiveStreamEvent,
},
SendStreamEvent {
token: Vec<u8>,
ts: u64,
session_uuid: u64,
event: MediaSendStreamEvent,
},
}

#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, IntoVecU8, TryFromSliceU8)]
pub struct MediaEndpointLogResponse {}
3 changes: 3 additions & 0 deletions packages/cluster/src/define/rpc/webrtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub struct WebrtcConnectRequestSender {

#[derive(Debug, Serialize, Deserialize, Object, PartialEq, Eq, IntoVecU8, TryFromSliceU8, Clone)]
pub struct WebrtcConnectRequest {
pub session_uuid: Option<u64>,
pub ip_addr: Option<String>,
pub user_agent: Option<String>,
pub version: Option<String>,
pub room: String,
pub peer: String,
Expand Down
3 changes: 3 additions & 0 deletions packages/cluster/src/define/rpc/whep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, Object, PartialEq, Eq, IntoVecU8, TryFromSliceU8, Clone)]
pub struct WhepConnectRequest {
pub session_uuid: u64,
pub ip_addr: String,
pub user_agent: String,
pub token: String,
pub sdp: Option<String>,
pub compressed_sdp: Option<Vec<u8>>,
Expand Down
3 changes: 3 additions & 0 deletions packages/cluster/src/define/rpc/whip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, Object, PartialEq, Eq, IntoVecU8, TryFromSliceU8, Clone)]
pub struct WhipConnectRequest {
pub session_uuid: u64,
pub ip_addr: String,
pub user_agent: String,
pub token: String,
pub sdp: Option<String>,
pub compressed_sdp: Option<Vec<u8>>,
Expand Down
27 changes: 22 additions & 5 deletions packages/cluster/src/implement/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use std::collections::HashMap;

use crate::{
generate_cluster_track_uuid, ClusterEndpoint, ClusterEndpointError, ClusterEndpointIncomingEvent, ClusterEndpointOutgoingEvent, ClusterLocalTrackIncomingEvent, ClusterLocalTrackOutgoingEvent,
ClusterRemoteTrackIncomingEvent, ClusterRemoteTrackOutgoingEvent,
generate_cluster_track_uuid,
rpc::{connector::MediaEndpointLogResponse, RPC_MEDIA_ENDPOINT_LOG},
ClusterEndpoint, ClusterEndpointError, ClusterEndpointIncomingEvent, ClusterEndpointOutgoingEvent, ClusterLocalTrackIncomingEvent, ClusterLocalTrackOutgoingEvent, ClusterRemoteTrackIncomingEvent,
ClusterRemoteTrackOutgoingEvent, CONNECTOR_SERVICE,
};
use async_std::channel::{bounded, Receiver, Sender};
use atm0s_sdn::{ChannelUuid, ConsumerRaw, Feedback, FeedbackType, KeyId, KeySource, KeyValueSdk, KeyVersion, LocalSubId, NodeId, NumberInfo, PublisherRaw, PubsubSdk, SubKeyId, ValueType};
use atm0s_sdn::{
ChannelUuid, ConsumerRaw, Feedback, FeedbackType, KeyId, KeySource, KeyValueSdk, KeyVersion, LocalSubId, NodeId, NumberInfo, PublisherRaw, PubsubSdk, RouteRule, RpcEmitter, SubKeyId, ValueType,
};
use bytes::Bytes;
use futures::{select, FutureExt};
use media_utils::hash_str;
use media_utils::{hash_str, ErrorDebugger};
use transport::RequestKeyframeKind;

use super::types::{from_room_value, to_room_key, to_room_value, TrackData};
Expand Down Expand Up @@ -50,10 +54,11 @@ pub struct ClusterEndpointSdn {
peer_sub: HashMap<String, ()>,
track_pub: HashMap<ChannelUuid, (u16, PublisherRaw)>,
remote_track_cached: HashMap<u64, (String, String)>,
rpc_emitter: RpcEmitter,
}

impl ClusterEndpointSdn {
pub(crate) fn new(room_id: &str, peer_id: &str, pubsub_sdk: PubsubSdk, kv_sdk: KeyValueSdk) -> Self {
pub(crate) fn new(room_id: &str, peer_id: &str, pubsub_sdk: PubsubSdk, kv_sdk: KeyValueSdk, rpc_emitter: RpcEmitter) -> Self {
let (kv_tx, kv_rx) = bounded(100);
let (data_tx, data_rx) = bounded(1000);
let (data_fb_tx, data_fb_rx) = bounded(100);
Expand All @@ -77,6 +82,7 @@ impl ClusterEndpointSdn {
peer_sub: Default::default(),
track_pub: Default::default(),
remote_track_cached: Default::default(),
rpc_emitter,
}
}

Expand Down Expand Up @@ -249,6 +255,17 @@ impl ClusterEndpoint for ClusterEndpointSdn {
}
}
}
ClusterEndpointOutgoingEvent::MediaEndpointLog(event) => {
log::info!("[Atm0sClusterEndpoint] log event {:?}", event);
let emitter = self.rpc_emitter.clone();
async_std::task::spawn_local(async move {
emitter
.request::<_, MediaEndpointLogResponse>(CONNECTOR_SERVICE, RouteRule::ToService(0), RPC_MEDIA_ENDPOINT_LOG, event, 5000)
.await
.log_error("Should ok");
});
Ok(())
}
}
}

Expand Down
6 changes: 4 additions & 2 deletions packages/cluster/src/implement/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::Cluster;
use atm0s_sdn::{
convert_enum, KeyValueBehavior, KeyValueBehaviorEvent, KeyValueHandlerEvent, KeyValueSdk, KeyValueSdkEvent, LayersSpreadRouterSyncBehavior, LayersSpreadRouterSyncBehaviorEvent,
LayersSpreadRouterSyncHandlerEvent, ManualBehavior, ManualBehaviorConf, ManualBehaviorEvent, ManualHandlerEvent, NetworkPlane, NetworkPlaneConfig, NodeAddr, NodeAddrBuilder, NodeId, Protocol,
PubsubSdk, PubsubServiceBehaviour, PubsubServiceBehaviourEvent, PubsubServiceHandlerEvent, RpcBox, SharedRouter, SystemTimer, UdpTransport,
PubsubSdk, PubsubServiceBehaviour, PubsubServiceBehaviourEvent, PubsubServiceHandlerEvent, RpcBox, RpcEmitter, SharedRouter, SystemTimer, UdpTransport,
};

use super::{endpoint, rpc::RpcEndpointSdn};
Expand Down Expand Up @@ -39,6 +39,7 @@ pub struct ServerSdn {
join_handler: Option<async_std::task::JoinHandle<()>>,
pubsub_sdk: PubsubSdk,
kv_sdk: KeyValueSdk,
rpc_emitter: RpcEmitter,
}

impl ServerSdn {
Expand Down Expand Up @@ -92,6 +93,7 @@ impl ServerSdn {
pubsub_sdk,
kv_sdk,
join_handler: Some(join_handler),
rpc_emitter: rpc_box.emitter(),
},
RpcEndpointSdn { rpc_box },
)
Expand All @@ -104,7 +106,7 @@ impl Cluster<endpoint::ClusterEndpointSdn> for ServerSdn {
}

fn build(&mut self, room_id: &str, peer_id: &str) -> endpoint::ClusterEndpointSdn {
endpoint::ClusterEndpointSdn::new(room_id, peer_id, self.pubsub_sdk.clone(), self.kv_sdk.clone())
endpoint::ClusterEndpointSdn::new(room_id, peer_id, self.pubsub_sdk.clone(), self.kv_sdk.clone(), self.rpc_emitter.clone())
}
}

Expand Down
Loading
Loading