Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: connector external event log - protobuf #132

Merged
merged 13 commits into from
Jan 5, 2024
Merged
674 changes: 640 additions & 34 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"packages/endpoint",
"packages/transport",
"packages/media-utils",
"packages/protocol",
"transports/webrtc",
"transports/rtmp",
"transports/sip",
Expand All @@ -19,4 +20,5 @@ parking_lot = "0.12"
log = { version = "0.4" }
env_logger = { version = "0.10" }
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde = { version = "1.0", features = ["derive"] }
prost = "0.12"
4 changes: 3 additions & 1 deletion packages/cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ async-trait = { workspace = true }
serde = { workspace = true }
poem-openapi = { version = "3.0" }
bincode = { version = "1" }
protocol = { package = "atm0s-media-server-protocol", path = "../protocol", version = "0.1.0" }

async-std = { workspace = true, optional = true }
log = { workspace = true, optional = true }
futures = { workspace = true, optional = true }
bytes = { version = "1.5", optional = true }
prost = "0.12"
luongngocminh marked this conversation as resolved.
Show resolved Hide resolved

[features]
default = ["impl"]
impl = ["async-std", "log", "atm0s-sdn", "futures", "bytes"]
impl = ["async-std", "log", "atm0s-sdn", "futures", "bytes"]
9 changes: 5 additions & 4 deletions packages/cluster/src/define/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use protocol::media_event_logs::MediaEndpointLogEvent;
use transport::TrackId;

use crate::{
rpc::connector::MediaEndpointLogRequest, ClusterEndpointError, ClusterLocalTrackIncomingEvent, ClusterLocalTrackOutgoingEvent, ClusterPeerId, ClusterRemoteTrackIncomingEvent,
ClusterRemoteTrackOutgoingEvent, ClusterTrackMeta, ClusterTrackName, ClusterTrackUuid,
ClusterEndpointError, ClusterLocalTrackIncomingEvent, ClusterLocalTrackOutgoingEvent, ClusterPeerId, ClusterRemoteTrackIncomingEvent, ClusterRemoteTrackOutgoingEvent, ClusterTrackMeta,
ClusterTrackName, ClusterTrackUuid,
};

#[async_trait::async_trait]
Expand All @@ -11,15 +12,15 @@ pub trait ClusterEndpoint: Send + Sync {
async fn recv(&mut self) -> Result<ClusterEndpointIncomingEvent, ClusterEndpointError>;
}

#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq)]
pub enum ClusterEndpointOutgoingEvent {
SubscribeRoom,
UnsubscribeRoom,
SubscribePeer(ClusterPeerId),
UnsubscribePeer(ClusterPeerId),
LocalTrackEvent(TrackId, ClusterLocalTrackOutgoingEvent),
RemoteTrackEvent(TrackId, ClusterTrackUuid, ClusterRemoteTrackOutgoingEvent),
MediaEndpointLog(MediaEndpointLogRequest),
MediaEndpointLog(MediaEndpointLogEvent),
luongngocminh marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Clone, PartialEq, Eq, Debug)]
Expand Down
308 changes: 154 additions & 154 deletions packages/cluster/src/define/rpc/connector.rs
Original file line number Diff line number Diff line change
@@ -1,165 +1,165 @@
use std::net::SocketAddr;
// use std::net::SocketAddr;

use atm0s_sdn::NodeId;
use media_utils::F32;
// use atm0s_sdn::NodeId;
luongngocminh marked this conversation as resolved.
Show resolved Hide resolved
// use media_utils::F32;
use proc_macro::{IntoVecU8, TryFromSliceU8};
use serde::{Deserialize, Serialize};
use transport::MediaKind;
// use transport::MediaKind;

#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum MediaStreamIssueType {
Connectivity { mos: F32<2>, lost_percents: F32<2>, jitter_ms: F32<2>, rtt_ms: u32 },
}
// #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
// pub enum MediaStreamIssueType {
// Connectivity { mos: F32<2>, lost_percents: F32<2>, jitter_ms: F32<2>, rtt_ms: u32 },
// }

#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum MediaEndpointEvent {
Routing {
user_agent: String,
gateway_node_id: NodeId,
},
RoutingError {
reason: String,
gateway_node_id: NodeId,
media_node_ids: Vec<NodeId>,
},
Routed {
media_node_id: NodeId,
after_ms: u32,
},
Connecting {
user_agent: String,
remote: Option<SocketAddr>,
},
ConnectError {
remote: Option<SocketAddr>,
error_code: String,
error_message: String,
},
Connected {
after_ms: u32,
remote: Option<SocketAddr>,
},
Reconnecting {
reason: String,
},
Reconnected {
remote: Option<SocketAddr>,
},
Disconnected {
error: Option<String>,
sent_bytes: u64,
received_bytes: u64,
duration_ms: u64,
rtt: F32<2>,
},
SessionStats {
received_bytes: u64,
receive_limit_bitrate: u32,
sent_bytes: u64,
send_est_bitrate: u32,
rtt: u16,
},
}
// #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
// pub enum MediaEndpointEvent {
// Routing {
// user_agent: String,
// gateway_node_id: NodeId,
// },
// RoutingError {
// reason: String,
// gateway_node_id: NodeId,
// media_node_ids: Vec<NodeId>,
// },
// Routed {
// media_node_id: NodeId,
// after_ms: u32,
// },
// Connecting {
// user_agent: String,
// remote: Option<SocketAddr>,
// },
// ConnectError {
// remote: Option<SocketAddr>,
// error_code: String,
// error_message: String,
// },
// Connected {
// after_ms: u32,
// remote: Option<SocketAddr>,
// },
// Reconnecting {
// reason: String,
// },
// Reconnected {
// remote: Option<SocketAddr>,
// },
// Disconnected {
// error: Option<String>,
// sent_bytes: u64,
// received_bytes: u64,
// duration_ms: u64,
// rtt: F32<2>,
// },
// SessionStats {
// received_bytes: u64,
// receive_limit_bitrate: u32,
// sent_bytes: u64,
// send_est_bitrate: u32,
// rtt: u16,
// },
// }

#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum MediaReceiveStreamEvent {
StreamStarted {
name: String,
kind: MediaKind,
remote_peer: String,
remote_stream: String,
},
StreamIssue {
name: String,
kind: MediaKind,
remote_peer: String,
remote_stream: String,
issue: MediaStreamIssueType,
},
StreamStats {
name: String,
kind: MediaKind,
limit_bitrate: u32,
received_bytes: u64,
freeze: bool,
mos: Option<F32<2>>,
rtt: Option<u32>,
jitter: Option<F32<2>>,
lost: Option<F32<2>>,
},
StreamEnded {
name: String,
kind: MediaKind,
sent_bytes: u64,
freeze_count: u32,
duration_ms: u64,
mos: Option<(F32<2>, F32<2>, F32<2>)>,
rtt: Option<(F32<2>, F32<2>, F32<2>)>,
jitter: Option<(F32<2>, F32<2>, F32<2>)>,
lost: Option<(F32<2>, F32<2>, F32<2>)>,
},
}
// #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
// pub enum MediaReceiveStreamEvent {
// StreamStarted {
// name: String,
// kind: MediaKind,
// remote_peer: String,
// remote_stream: String,
// },
// StreamIssue {
// name: String,
// kind: MediaKind,
// remote_peer: String,
// remote_stream: String,
// issue: MediaStreamIssueType,
// },
// StreamStats {
// name: String,
// kind: MediaKind,
// limit_bitrate: u32,
// received_bytes: u64,
// freeze: bool,
// mos: Option<F32<2>>,
// rtt: Option<u32>,
// jitter: Option<F32<2>>,
// lost: Option<F32<2>>,
// },
// StreamEnded {
// name: String,
// kind: MediaKind,
// sent_bytes: u64,
// freeze_count: u32,
// duration_ms: u64,
// mos: Option<(F32<2>, F32<2>, F32<2>)>,
// rtt: Option<(F32<2>, F32<2>, F32<2>)>,
// jitter: Option<(F32<2>, F32<2>, F32<2>)>,
// lost: Option<(F32<2>, F32<2>, F32<2>)>,
// },
// }

#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum MediaSendStreamEvent {
StreamStarted {
name: String,
kind: MediaKind,
meta: String,
scaling: String,
},
StreamIssue {
name: String,
kind: MediaKind,
issue: MediaStreamIssueType,
},
StreamStats {
name: String,
kind: MediaKind,
sent_bytes: u64,
freeze: bool,
mos: Option<F32<2>>,
rtt: Option<u32>,
jitter: Option<F32<2>>,
lost: Option<F32<2>>,
},
StreamEnded {
name: String,
kind: MediaKind,
received_bytes: u64,
duration_ms: u64,
freeze_count: u32,
mos: Option<(F32<2>, F32<2>, F32<2>)>,
rtt: Option<(F32<2>, F32<2>, F32<2>)>,
jitter: Option<(F32<2>, F32<2>, F32<2>)>,
lost: Option<(F32<2>, F32<2>, F32<2>)>,
},
}
// #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
// pub enum MediaSendStreamEvent {
// StreamStarted {
// name: String,
// kind: MediaKind,
// meta: String,
// scaling: String,
// },
// StreamIssue {
// name: String,
// kind: MediaKind,
// issue: MediaStreamIssueType,
// },
// StreamStats {
// name: String,
// kind: MediaKind,
// sent_bytes: u64,
// freeze: bool,
// mos: Option<F32<2>>,
// rtt: Option<u32>,
// jitter: Option<F32<2>>,
// lost: Option<F32<2>>,
// },
// StreamEnded {
// name: String,
// kind: MediaKind,
// received_bytes: u64,
// duration_ms: u64,
// freeze_count: u32,
// mos: Option<(F32<2>, F32<2>, F32<2>)>,
// rtt: Option<(F32<2>, F32<2>, F32<2>)>,
// jitter: Option<(F32<2>, F32<2>, F32<2>)>,
// lost: Option<(F32<2>, F32<2>, F32<2>)>,
// },
// }

#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, IntoVecU8, TryFromSliceU8)]
pub enum MediaEndpointLogRequest {
SessionEvent {
ip: String,
version: Option<String>,
location: Option<(F32<2>, F32<2>)>,
token: Vec<u8>,
ts: u64,
session_uuid: u64,
event: MediaEndpointEvent,
},
ReceiveStreamEvent {
token: Vec<u8>,
ts: u64,
session_uuid: u64,
event: MediaReceiveStreamEvent,
},
SendStreamEvent {
token: Vec<u8>,
ts: u64,
session_uuid: u64,
event: MediaSendStreamEvent,
},
}
// #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, IntoVecU8, TryFromSliceU8)]
// pub enum MediaEndpointLogRequest {
// SessionEvent {
// ip: String,
// version: Option<String>,
// location: Option<(F32<2>, F32<2>)>,
// token: Vec<u8>,
// ts: u64,
// session_uuid: u64,
// event: MediaEndpointEvent,
// },
// ReceiveStreamEvent {
// token: Vec<u8>,
// ts: u64,
// session_uuid: u64,
// event: MediaReceiveStreamEvent,
// },
// SendStreamEvent {
// token: Vec<u8>,
// ts: u64,
// session_uuid: u64,
// event: MediaSendStreamEvent,
// },
// }

#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, IntoVecU8, TryFromSliceU8)]
pub struct MediaEndpointLogResponse {}
1 change: 1 addition & 0 deletions packages/endpoint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ description = "Media Endpoint for atm0s-media-server"
cluster = { package = "atm0s-media-server-cluster", path = "../cluster", version = "0.1.0" }
transport = { package = "atm0s-media-server-transport", path = "../transport", version = "0.1.0" }
media-utils = { package = "atm0s-media-server-utils", path = "../media-utils", version = "0.1.0" }
protocol = { package = "atm0s-media-server-protocol", path = "../protocol", version = "0.1.0" }
log = { workspace = true }
async-std = { workspace = true }
serde = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion packages/endpoint/src/endpoint_wrap/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub enum MediaEndpointInternalEvent {
UnsubscribePeer(String),
}

#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq)]
pub enum MediaInternalAction {
Internal(MediaEndpointInternalEvent),
Endpoint(TransportOutgoingEvent<EndpointRpcOut, RemoteTrackRpcOut, LocalTrackRpcOut>),
Expand Down