Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: connector external event log - protobuf #132

Merged
merged 13 commits into from
Jan 5, 2024
Merged
618 changes: 587 additions & 31 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ members = [
"packages/transport",
"packages/audio-mixer",
"packages/media-utils",
"packages/protocol",
"transports/webrtc",
"transports/rtmp",
"transports/sip",
Expand All @@ -20,4 +21,5 @@ parking_lot = "0.12"
log = { version = "0.4" }
env_logger = { version = "0.10" }
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde = { version = "1.0", features = ["derive"] }
prost = "0.12"
1 change: 1 addition & 0 deletions packages/cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ async-trait = { workspace = true }
serde = { workspace = true }
poem-openapi = { version = "3.0" }
bincode = { version = "1" }
protocol = { package = "atm0s-media-server-protocol", path = "../protocol", version = "0.1.0" }

async-std = { workspace = true, optional = true }
log = { workspace = true, optional = true }
Expand Down
7 changes: 4 additions & 3 deletions packages/cluster/src/define/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use protocol::media_event_logs::MediaEndpointLogRequest;
use transport::TrackId;

use crate::{
rpc::connector::MediaEndpointLogRequest, ClusterEndpointError, ClusterLocalTrackIncomingEvent, ClusterLocalTrackOutgoingEvent, ClusterPeerId, ClusterRemoteTrackIncomingEvent,
ClusterRemoteTrackOutgoingEvent, ClusterTrackMeta, ClusterTrackName, ClusterTrackUuid,
ClusterEndpointError, ClusterLocalTrackIncomingEvent, ClusterLocalTrackOutgoingEvent, ClusterPeerId, ClusterRemoteTrackIncomingEvent, ClusterRemoteTrackOutgoingEvent, ClusterTrackMeta,
ClusterTrackName, ClusterTrackUuid,
};

#[async_trait::async_trait]
Expand All @@ -11,7 +12,7 @@ pub trait ClusterEndpoint: Send + Sync {
async fn recv(&mut self) -> Result<ClusterEndpointIncomingEvent, ClusterEndpointError>;
}

#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq)]
pub enum ClusterEndpointOutgoingEvent {
SubscribeRoom,
UnsubscribeRoom,
Expand Down
160 changes: 0 additions & 160 deletions packages/cluster/src/define/rpc/connector.rs
Original file line number Diff line number Diff line change
@@ -1,165 +1,5 @@
use std::net::SocketAddr;

use atm0s_sdn::NodeId;
use media_utils::F32;
use proc_macro::{IntoVecU8, TryFromSliceU8};
use serde::{Deserialize, Serialize};
use transport::MediaKind;

#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum MediaStreamIssueType {
Connectivity { mos: F32<2>, lost_percents: F32<2>, jitter_ms: F32<2>, rtt_ms: u32 },
}

#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum MediaEndpointEvent {
Routing {
user_agent: String,
gateway_node_id: NodeId,
},
RoutingError {
reason: String,
gateway_node_id: NodeId,
media_node_ids: Vec<NodeId>,
},
Routed {
media_node_id: NodeId,
after_ms: u32,
},
Connecting {
user_agent: String,
remote: Option<SocketAddr>,
},
ConnectError {
remote: Option<SocketAddr>,
error_code: String,
error_message: String,
},
Connected {
after_ms: u32,
remote: Option<SocketAddr>,
},
Reconnecting {
reason: String,
},
Reconnected {
remote: Option<SocketAddr>,
},
Disconnected {
error: Option<String>,
sent_bytes: u64,
received_bytes: u64,
duration_ms: u64,
rtt: F32<2>,
},
SessionStats {
received_bytes: u64,
receive_limit_bitrate: u32,
sent_bytes: u64,
send_est_bitrate: u32,
rtt: u16,
},
}

#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum MediaReceiveStreamEvent {
StreamStarted {
name: String,
kind: MediaKind,
remote_peer: String,
remote_stream: String,
},
StreamIssue {
name: String,
kind: MediaKind,
remote_peer: String,
remote_stream: String,
issue: MediaStreamIssueType,
},
StreamStats {
name: String,
kind: MediaKind,
limit_bitrate: u32,
received_bytes: u64,
freeze: bool,
mos: Option<F32<2>>,
rtt: Option<u32>,
jitter: Option<F32<2>>,
lost: Option<F32<2>>,
},
StreamEnded {
name: String,
kind: MediaKind,
sent_bytes: u64,
freeze_count: u32,
duration_ms: u64,
mos: Option<(F32<2>, F32<2>, F32<2>)>,
rtt: Option<(F32<2>, F32<2>, F32<2>)>,
jitter: Option<(F32<2>, F32<2>, F32<2>)>,
lost: Option<(F32<2>, F32<2>, F32<2>)>,
},
}

#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum MediaSendStreamEvent {
StreamStarted {
name: String,
kind: MediaKind,
meta: String,
scaling: String,
},
StreamIssue {
name: String,
kind: MediaKind,
issue: MediaStreamIssueType,
},
StreamStats {
name: String,
kind: MediaKind,
sent_bytes: u64,
freeze: bool,
mos: Option<F32<2>>,
rtt: Option<u32>,
jitter: Option<F32<2>>,
lost: Option<F32<2>>,
},
StreamEnded {
name: String,
kind: MediaKind,
received_bytes: u64,
duration_ms: u64,
freeze_count: u32,
mos: Option<(F32<2>, F32<2>, F32<2>)>,
rtt: Option<(F32<2>, F32<2>, F32<2>)>,
jitter: Option<(F32<2>, F32<2>, F32<2>)>,
lost: Option<(F32<2>, F32<2>, F32<2>)>,
},
}

#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, IntoVecU8, TryFromSliceU8)]
pub enum MediaEndpointLogRequest {
SessionEvent {
ip: String,
version: Option<String>,
location: Option<(F32<2>, F32<2>)>,
token: Vec<u8>,
ts: u64,
session_uuid: u64,
event: MediaEndpointEvent,
},
ReceiveStreamEvent {
token: Vec<u8>,
ts: u64,
session_uuid: u64,
event: MediaReceiveStreamEvent,
},
SendStreamEvent {
token: Vec<u8>,
ts: u64,
session_uuid: u64,
event: MediaSendStreamEvent,
},
}

#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, IntoVecU8, TryFromSliceU8)]
pub struct MediaEndpointLogResponse {}
1 change: 1 addition & 0 deletions packages/endpoint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ description = "Media Endpoint for atm0s-media-server"
cluster = { package = "atm0s-media-server-cluster", path = "../cluster", version = "0.1.0" }
transport = { package = "atm0s-media-server-transport", path = "../transport", version = "0.1.0" }
media-utils = { package = "atm0s-media-server-utils", path = "../media-utils", version = "0.1.0" }
protocol = { package = "atm0s-media-server-protocol", path = "../protocol", version = "0.1.0" }
audio-mixer = { package = "atm0s-media-server-audio-mixer", path = "../audio-mixer", version = "0.1.0" }
log = { workspace = true }
async-std = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion packages/endpoint/src/endpoint_wrap/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub enum MediaEndpointInternalEvent {
ConnectionError(TransportError),
}

#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq)]
pub enum MediaInternalAction {
Internal(MediaEndpointInternalEvent),
Endpoint(TransportOutgoingEvent<EndpointRpcOut, RemoteTrackRpcOut, LocalTrackRpcOut>),
Expand Down
2 changes: 1 addition & 1 deletion packages/endpoint/src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
pub mod logger;
pub mod mix_minus;

#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq)]
pub enum MediaEndpointMiddlewareOutput {
Endpoint(TransportOutgoingEvent<EndpointRpcOut, RemoteTrackRpcOut, LocalTrackRpcOut>),
Cluster(ClusterEndpointOutgoingEvent),
Expand Down
67 changes: 36 additions & 31 deletions packages/endpoint/src/middleware/logger.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::collections::VecDeque;

use cluster::rpc::connector::{MediaEndpointEvent, MediaEndpointLogRequest};
use media_utils::F32;
use protocol::media_event_logs::{
session_event::{self, SessionConnectError, SessionConnected, SessionConnecting, SessionDisconnected, SessionReconnected, SessionReconnecting},
MediaEndpointLogEvent, MediaEndpointLogRequest, F32p2, SessionEvent,
};
use transport::{TransportError, TransportIncomingEvent, TransportStateEvent};

use crate::{MediaEndpointMiddleware, MediaEndpointMiddlewareOutput};
Expand All @@ -19,17 +21,20 @@ impl MediaEndpointEventLogger {
}
}

fn build_event(&self, now_ms: u64, event: MediaEndpointEvent) -> MediaEndpointMiddlewareOutput {
fn build_event(&self, now_ms: u64, event: session_event::Event) -> MediaEndpointMiddlewareOutput {
log::info!("sending event out to connector {:?}", event);
let event = MediaEndpointLogRequest::SessionEvent {
ip: "127.0.0.1".to_string(), //TODO
version: None,
location: None,
token: vec![],
ts: now_ms,
session_uuid: 0, //TODO
event,
let event = MediaEndpointLogRequest {
event: Some(MediaEndpointLogEvent::SessionEvent(SessionEvent {
ip: "127.0.0.1".to_string(), //TODO
version: None,
location: None,
token: vec![],
ts: now_ms,
session_uuid: 0, //TODO
event: Some(event),
})),
};

MediaEndpointMiddlewareOutput::Cluster(cluster::ClusterEndpointOutgoingEvent::MediaEndpointLog(event))
}
}
Expand All @@ -39,10 +44,10 @@ impl MediaEndpointMiddleware for MediaEndpointEventLogger {
self.started_ms = Some(now_ms);
self.outputs.push_back(self.build_event(
now_ms,
MediaEndpointEvent::Connecting {
session_event::Event::Connecting(SessionConnecting {
user_agent: "TODO".to_string(), //TODO
remote: None, //TODO
},
}),
));
}

Expand All @@ -55,38 +60,38 @@ impl MediaEndpointMiddleware for MediaEndpointEventLogger {
TransportStateEvent::Connected => {
self.outputs.push_back(self.build_event(
now_ms,
MediaEndpointEvent::Connected {
session_event::Event::Connected(SessionConnected {
after_ms: (now_ms - self.started_ms.expect("Should has started")) as u32,
remote: None, //TODO
},
}),
));
}
TransportStateEvent::Reconnecting => {
self.outputs.push_back(self.build_event(
now_ms,
MediaEndpointEvent::Reconnecting {
session_event::Event::Reconnecting(SessionReconnecting {
reason: "TODO".to_string(), //TODO
},
}),
));
}
TransportStateEvent::Reconnected => {
self.outputs.push_back(self.build_event(
now_ms,
MediaEndpointEvent::Reconnected {
session_event::Event::Reconnected(SessionReconnected{
remote: None, //TODO
},
}),
));
}
TransportStateEvent::Disconnected => {
self.outputs.push_back(self.build_event(
now_ms,
MediaEndpointEvent::Disconnected {
session_event::Event::Disconnected(SessionDisconnected {
error: None,
duration_ms: now_ms - self.started_ms.expect("Should has started"),
received_bytes: 0, //TODO
rtt: F32::new(0.0), //TODO
sent_bytes: 0, //TODO
},
received_bytes: 0, //TODO
rtt: Some(F32p2 { value: 0 }), //TODO
sent_bytes: 0, //TODO
}),
));
}
},
Expand All @@ -101,23 +106,23 @@ impl MediaEndpointMiddleware for MediaEndpointEventLogger {
TransportError::ConnectError(_) => {
self.outputs.push_back(self.build_event(
now_ms,
MediaEndpointEvent::ConnectError {
session_event::Event::ConnectError(SessionConnectError {
remote: None, //TODO
error_code: "TODO".to_string(), //TODO
error_message: "TODO".to_string(), //TODO
},
}),
));
}
TransportError::ConnectionError(_) => {
self.outputs.push_back(self.build_event(
now_ms,
MediaEndpointEvent::Disconnected {
session_event::Event::Disconnected(SessionDisconnected {
error: Some("TIMEOUT".to_string()), //TODO
duration_ms: now_ms - self.started_ms.expect("Should has started"),
received_bytes: 0, //TODO
rtt: F32::new(0.0), //TODO
sent_bytes: 0, //TODO
},
received_bytes: 0, //TODO
rtt: Some(F32p2 { value: 0 }), //TODO
sent_bytes: 0,
}),
));
}
_ => {}
Expand Down
15 changes: 15 additions & 0 deletions packages/protocol/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "atm0s-media-server-protocol"
version = "0.1.0"
edition = "2021"
description = "Cluster Protobuf definitions for atm0s-media-server"
license = "MIT"
build = "build.rs"

luongngocminh marked this conversation as resolved.
Show resolved Hide resolved
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
prost = { workspace = true }

[build-dependencies]
prost-build = "0.12"
10 changes: 10 additions & 0 deletions packages/protocol/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
extern crate prost_build;

fn main() {
let mut config = prost_build::Config::new();
// config.type_attribute(".", "");
// config.type_attribute(".receive_stream_event", "#[derive(Eq)]");
// config.type_attribute(".send_stream_event", "#[derive(Eq)]");
config.compile_protos(&["src/atm0s.proto"],
&["src/"]).unwrap();
}
Loading
Loading