diff --git a/.github/workflows/release-plz.yml b/.github/workflows/release-plz.yml index d9cadc2f..274dea4f 100644 --- a/.github/workflows/release-plz.yml +++ b/.github/workflows/release-plz.yml @@ -23,7 +23,7 @@ jobs: uses: dtolnay/rust-toolchain@stable - name: Install dev-tools - run: sudo apt-get install -y --no-install-recommends pkg-config musl-dev musl-tools + run: sudo apt-get install -y --no-install-recommends pkg-config musl-dev musl-tools protobuf-compiler - name: Install deps run: sudo apt-get install -y --no-install-recommends libssl-dev libopus-dev libfdk-aac-dev libsoxr-dev diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index d86647d5..f38a63c6 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -150,7 +150,7 @@ jobs: - name: Install dev-tools if: matrix.os == 'ubuntu-latest' - run: sudo apt-get install -y --no-install-recommends pkg-config musl-dev musl-tools + run: sudo apt-get install -y --no-install-recommends pkg-config musl-dev musl-tools protobuf-compiler - name: Patch some libs run: | diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 5039686a..c336ad9a 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -19,7 +19,7 @@ jobs: - name: Install deps run: | sudo apt-get update - sudo apt install -y libsoxr-dev libopus-dev libssl-dev libfdk-aac-dev + sudo apt install -y libsoxr-dev libopus-dev libssl-dev libfdk-aac-dev protobuf-compiler - name: Install Rust run: rustup update stable - name: Install cargo-llvm-cov @@ -62,4 +62,4 @@ jobs: with: command: check log-level: error - arguments: --all-features --target ${{ matrix.platform }} \ No newline at end of file + arguments: --all-features --target ${{ matrix.platform }} diff --git a/Cargo.lock b/Cargo.lock index 54f4312a..c38670f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -135,7 +135,7 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" dependencies = [ - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -145,9 +145,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628" dependencies = [ "anstyle", - "windows-sys", + "windows-sys 0.48.0", ] +[[package]] +name = "anyhow" +version = "1.0.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" + [[package]] name = "arc-swap" version = "1.6.0" @@ -276,7 +282,7 @@ dependencies = [ "slab", "tracing", "waker-fn", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -323,7 +329,7 @@ dependencies = [ "event-listener 3.1.0", "futures-lite 1.13.0", "rustix 0.38.25", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -341,7 +347,7 @@ dependencies = [ "rustix 0.38.25", "signal-hook-registry", "slab", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -417,6 +423,7 @@ dependencies = [ "async-trait", "atm0s-media-server-cluster", "atm0s-media-server-endpoint", + "atm0s-media-server-protocol", "atm0s-media-server-transport", "atm0s-media-server-transport-rtmp", "atm0s-media-server-transport-sip", @@ -427,12 +434,15 @@ dependencies = [ "log", "metrics", "metrics-dashboard", + "nats", "parking_lot", "poem", "poem-openapi", + "prost", "rsip", "rust-embed", "serde", + "serde_json", "tracing-subscriber", ] @@ -451,6 +461,7 @@ dependencies = [ "async-std", "async-trait", "atm0s-media-server-proc-macro", + "atm0s-media-server-protocol", "atm0s-media-server-transport", "atm0s-media-server-utils", "atm0s-sdn", @@ -472,6 +483,7 @@ dependencies = [ "async-std", "atm0s-media-server-audio-mixer", "atm0s-media-server-cluster", + "atm0s-media-server-protocol", "atm0s-media-server-transport", "atm0s-media-server-utils", "futures", @@ -487,6 +499,14 @@ dependencies = [ "syn 2.0.42", ] +[[package]] +name = "atm0s-media-server-protocol" +version = "0.1.0" +dependencies = [ + "prost", + "prost-build", +] + [[package]] name = "atm0s-media-server-transport" version = "0.1.0" @@ -861,6 +881,21 @@ version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" +[[package]] +name = "base64-url" +version = "1.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67a99c239d0c7e77c85dddfa9cebce48704b3c49550fcd3b84dd637e4484899f" +dependencies = [ + "base64 0.13.1", +] + +[[package]] +name = "base64ct" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" + [[package]] name = "bincode" version = "1.3.3" @@ -1038,7 +1073,7 @@ dependencies = [ "js-sys", "num-traits", "wasm-bindgen", - "windows-targets", + "windows-targets 0.48.5", ] [[package]] @@ -1153,6 +1188,12 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "const-oid" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" + [[package]] name = "convert-enum" version = "0.1.0" @@ -1187,6 +1228,16 @@ dependencies = [ "version_check", ] +[[package]] +name = "core-foundation" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.4" @@ -1271,6 +1322,16 @@ dependencies = [ "itertools 0.10.5", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82a9b73a36529d9c47029b9fb3a6f0ea3cc916a261195352ba19e770fc1748b2" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.3" @@ -1297,9 +1358,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.16" +version = "0.8.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" +checksum = "c3a430a770ebd84726f584a90ee7f020d28db52c6d02138900f22341f866d39c" dependencies = [ "cfg-if", ] @@ -1343,6 +1404,7 @@ dependencies = [ "cfg-if", "cpufeatures", "curve25519-dalek-derive", + "digest 0.10.7", "fiat-crypto", "platforms", "rustc_version", @@ -1457,6 +1519,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "der" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fffa369a668c8af7dbf8b5e56c9f744fbd399949ed171606040001947de40b1c" +dependencies = [ + "const-oid", + "pem-rfc7468", + "zeroize", +] + [[package]] name = "deranged" version = "0.3.9" @@ -1464,6 +1537,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0f32d04922c60427da6f9fef14d042d9edddef64cb9d4ce0d64d0685fbeb1fd3" dependencies = [ "powerfmt", + "serde", ] [[package]] @@ -1505,6 +1579,28 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" +[[package]] +name = "ed25519" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" +dependencies = [ + "signature", +] + +[[package]] +name = "ed25519-dalek" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f628eaec48bfd21b865dc2950cfa014450c01d2fa2b69a86c2fd5844ec523c0" +dependencies = [ + "curve25519-dalek", + "ed25519", + "sha2 0.10.8", + "signature", + "subtle", +] + [[package]] name = "either" version = "1.9.0" @@ -1552,7 +1648,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f258a7194e7f7c2a7837a8913aeab7fd8c383457034fa20ce4dd3dcb813e8eb8" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1643,6 +1739,12 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27573eac26f4dd11e2b1916c3fe1baa56407c83c71a773a8ba17ec0bca03b6b7" +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "flate2" version = "1.0.28" @@ -2003,6 +2105,15 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "home" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "http" version = "0.2.11" @@ -2152,7 +2263,7 @@ checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" dependencies = [ "hermit-abi", "libc", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -2169,7 +2280,7 @@ checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ "hermit-abi", "rustix 0.38.25", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -2214,6 +2325,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd" + [[package]] name = "jwt" version = "0.16.0" @@ -2307,7 +2424,7 @@ dependencies = [ "libc", "neli", "thiserror", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -2504,7 +2621,7 @@ checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" dependencies = [ "libc", "wasi", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -2548,7 +2665,7 @@ dependencies = [ "log", "memchr", "mime", - "spin", + "spin 0.9.8", "tokio", "version_check", ] @@ -2593,6 +2710,48 @@ dependencies = [ "unsigned-varint", ] +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + +[[package]] +name = "nats" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e008b4960823570e06c8804a5c2c91458b91cc1708d5e21095c5e6de3fb92f5" +dependencies = [ + "base64 0.13.1", + "base64-url", + "blocking", + "crossbeam-channel", + "fastrand 1.9.0", + "itoa", + "json", + "lazy_static", + "libc", + "log", + "memchr", + "nkeys", + "nuid", + "once_cell", + "parking_lot", + "regex", + "ring 0.16.20", + "rustls", + "rustls-native-certs", + "rustls-pemfile", + "rustls-webpki 0.100.3", + "serde", + "serde_json", + "serde_nanos", + "serde_repr", + "time", + "url", + "winapi", +] + [[package]] name = "neli" version = "0.6.4" @@ -2651,6 +2810,22 @@ dependencies = [ "libc", ] +[[package]] +name = "nkeys" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aad178aad32087b19042ee36dfd450b73f5f934fbfb058b59b198684dfec4c47" +dependencies = [ + "byteorder", + "data-encoding", + "ed25519", + "ed25519-dalek", + "getrandom", + "log", + "rand 0.8.5", + "signatory", +] + [[package]] name = "nom" version = "7.1.3" @@ -2680,6 +2855,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "nuid" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20c1bb65186718d348306bf1afdeb20d9ab45b2ab80fb793c0fdcf59ffbb4f38" +dependencies = [ + "lazy_static", + "rand 0.8.5", +] + [[package]] name = "num-traits" version = "0.2.17" @@ -2752,6 +2937,12 @@ dependencies = [ "syn 2.0.42", ] +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + [[package]] name = "openssl-src" version = "300.1.6+3.1.4" @@ -2835,7 +3026,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-targets", + "windows-targets 0.48.5", ] [[package]] @@ -2844,12 +3035,31 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" +[[package]] +name = "pem-rfc7468" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412" +dependencies = [ + "base64ct", +] + [[package]] name = "percent-encoding" version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "petgraph" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" +dependencies = [ + "fixedbitset", + "indexmap 2.1.0", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -2873,6 +3083,16 @@ dependencies = [ "futures-io", ] +[[package]] +name = "pkcs8" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" +dependencies = [ + "der", + "spki", +] + [[package]] name = "pkg-config" version = "0.3.27" @@ -3024,7 +3244,7 @@ dependencies = [ "libc", "log", "pin-project-lite", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -3038,7 +3258,7 @@ dependencies = [ "pin-project-lite", "rustix 0.38.25", "tracing", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -3109,6 +3329,16 @@ dependencies = [ "termtree", ] +[[package]] +name = "prettyplease" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" +dependencies = [ + "proc-macro2", + "syn 2.0.42", +] + [[package]] name = "proc-macro-crate" version = "2.0.0" @@ -3142,6 +3372,60 @@ dependencies = [ "thiserror", ] +[[package]] +name = "prost" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c55e02e35260070b6f716a2423c2ff1c3bb1642ddca6f99e1f26d06268a0e2d2" +dependencies = [ + "bytes", + "heck", + "itertools 0.11.0", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.42", + "tempfile", + "which", +] + +[[package]] +name = "prost-derive" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e" +dependencies = [ + "anyhow", + "itertools 0.11.0", + "proc-macro2", + "quote", + "syn 2.0.42", +] + +[[package]] +name = "prost-types" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "193898f59edcf43c26227dcd4c8427f00d99d61e95dcde58dabd49fa291d470e" +dependencies = [ + "prost", +] + [[package]] name = "protobuf" version = "2.28.0" @@ -3370,6 +3654,35 @@ dependencies = [ "uncased", ] +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin 0.5.2", + "untrusted 0.7.1", + "web-sys", + "winapi", +] + +[[package]] +name = "ring" +version = "0.17.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "688c63d65483050968b2a8937f7995f443e27041a0f7700aa59b0822aedebb74" +dependencies = [ + "cc", + "getrandom", + "libc", + "spin 0.9.8", + "untrusted 0.9.0", + "windows-sys 0.48.0", +] + [[package]] name = "rml_amf0" version = "0.3.0" @@ -3507,7 +3820,7 @@ dependencies = [ "io-lifetimes", "libc", "linux-raw-sys 0.3.8", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -3520,7 +3833,60 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.4.11", - "windows-sys", + "windows-sys 0.48.0", +] + +[[package]] +name = "rustls" +version = "0.21.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" +dependencies = [ + "log", + "ring 0.17.7", + "rustls-webpki 0.101.7", + "sct", +] + +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" +dependencies = [ + "base64 0.21.5", +] + +[[package]] +name = "rustls-webpki" +version = "0.100.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f6a5fc258f1c1276dfe3016516945546e2d5383911efc0fc4f1cdc5df3a4ae3" +dependencies = [ + "ring 0.16.20", + "untrusted 0.7.1", +] + +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring 0.17.7", + "untrusted 0.9.0", ] [[package]] @@ -3538,12 +3904,31 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schannel" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring 0.17.7", + "untrusted 0.9.0", +] + [[package]] name = "sctp-proto" version = "0.1.6" @@ -3582,6 +3967,29 @@ dependencies = [ "syn 2.0.42", ] +[[package]] +name = "security-framework" +version = "2.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e932934257d3b408ed8f30db49d85ea163bfe74961f017f405b025af298f0c7a" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "1.0.20" @@ -3619,6 +4027,26 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_nanos" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ae801b7733ca8d6a2b580debe99f67f36826a0f5b8a36055dc6bc40f8d6bc71" +dependencies = [ + "serde", +] + +[[package]] +name = "serde_repr" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3081f5ffbb02284dda55132aa26daecedd7372a42417bbbab6f14ab7d6bb9145" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.42", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -3718,6 +4146,28 @@ dependencies = [ "libc", ] +[[package]] +name = "signatory" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e303f8205714074f6068773f0e29527e0453937fe837c9717d066635b65f31" +dependencies = [ + "pkcs8", + "rand_core 0.6.4", + "signature", + "zeroize", +] + +[[package]] +name = "signature" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" +dependencies = [ + "digest 0.10.7", + "rand_core 0.6.4", +] + [[package]] name = "sketches-ddsketch" version = "0.2.1" @@ -3783,7 +4233,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -3792,12 +4242,28 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c6734caf0b6f51addd5eeacca12fb39b2c6c14e8d4f3ac42f3a78955c0467458" +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "spki" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" +dependencies = [ + "base64ct", + "der", +] + [[package]] name = "static_assertions" version = "1.1.0" @@ -3875,7 +4341,7 @@ dependencies = [ "fastrand 2.0.1", "redox_syscall", "rustix 0.38.25", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -3993,7 +4459,7 @@ dependencies = [ "signal-hook-registry", "socket2 0.5.5", "tokio-macros", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -4232,6 +4698,18 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6889a77d49f1f013504cec6bf97a2c730394adedaeb1deb5ea08949a50541105" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.5.0" @@ -4416,6 +4894,18 @@ dependencies = [ "winapi", ] +[[package]] +name = "which" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" +dependencies = [ + "either", + "home", + "once_cell", + "rustix 0.38.25", +] + [[package]] name = "wildmatch" version = "2.1.1" @@ -4459,7 +4949,7 @@ version = "0.51.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1f8cf84f35d2db49a46868f947758c7a1138116f7fac3bc844f43ade1292e64" dependencies = [ - "windows-targets", + "windows-targets 0.48.5", ] [[package]] @@ -4468,7 +4958,16 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets", + "windows-targets 0.48.5", +] + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.0", ] [[package]] @@ -4477,13 +4976,28 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + +[[package]] +name = "windows-targets" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +dependencies = [ + "windows_aarch64_gnullvm 0.52.0", + "windows_aarch64_msvc 0.52.0", + "windows_i686_gnu 0.52.0", + "windows_i686_msvc 0.52.0", + "windows_x86_64_gnu 0.52.0", + "windows_x86_64_gnullvm 0.52.0", + "windows_x86_64_msvc 0.52.0", ] [[package]] @@ -4492,42 +5006,84 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" + [[package]] name = "windows_i686_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" +[[package]] +name = "windows_i686_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" + [[package]] name = "windows_i686_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" +[[package]] +name = "windows_i686_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" + [[package]] name = "winnow" version = "0.5.19" diff --git a/Cargo.toml b/Cargo.toml index 7c5b2c4b..e7a3faf0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "packages/transport", "packages/audio-mixer", "packages/media-utils", + "packages/protocol", "transports/webrtc", "transports/rtmp", "transports/sip", @@ -20,4 +21,5 @@ parking_lot = "0.12" log = { version = "0.4" } env_logger = { version = "0.10" } serde_json = "1.0" -serde = { version = "1.0", features = ["derive"] } \ No newline at end of file +serde = { version = "1.0", features = ["derive"] } +prost = "0.12" diff --git a/packages/cluster/Cargo.toml b/packages/cluster/Cargo.toml index 4d21c520..c17304e7 100644 --- a/packages/cluster/Cargo.toml +++ b/packages/cluster/Cargo.toml @@ -16,6 +16,7 @@ async-trait = { workspace = true } serde = { workspace = true } poem-openapi = { version = "3.0" } bincode = { version = "1" } +protocol = { package = "atm0s-media-server-protocol", path = "../protocol", version = "0.1.0" } async-std = { workspace = true, optional = true } log = { workspace = true, optional = true } diff --git a/packages/cluster/src/define/endpoint.rs b/packages/cluster/src/define/endpoint.rs index 867baadd..7651764e 100644 --- a/packages/cluster/src/define/endpoint.rs +++ b/packages/cluster/src/define/endpoint.rs @@ -1,8 +1,9 @@ +use protocol::media_event_logs::MediaEndpointLogRequest; use transport::TrackId; use crate::{ - rpc::connector::MediaEndpointLogRequest, ClusterEndpointError, ClusterLocalTrackIncomingEvent, ClusterLocalTrackOutgoingEvent, ClusterPeerId, ClusterRemoteTrackIncomingEvent, - ClusterRemoteTrackOutgoingEvent, ClusterTrackMeta, ClusterTrackName, ClusterTrackUuid, + ClusterEndpointError, ClusterLocalTrackIncomingEvent, ClusterLocalTrackOutgoingEvent, ClusterPeerId, ClusterRemoteTrackIncomingEvent, ClusterRemoteTrackOutgoingEvent, ClusterTrackMeta, + ClusterTrackName, ClusterTrackUuid, }; #[async_trait::async_trait] @@ -11,7 +12,7 @@ pub trait ClusterEndpoint: Send + Sync { async fn recv(&mut self) -> Result; } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq)] pub enum ClusterEndpointOutgoingEvent { SubscribeRoom, UnsubscribeRoom, diff --git a/packages/cluster/src/define/rpc/connector.rs b/packages/cluster/src/define/rpc/connector.rs index 695eda75..785be0e8 100644 --- a/packages/cluster/src/define/rpc/connector.rs +++ b/packages/cluster/src/define/rpc/connector.rs @@ -1,165 +1,5 @@ -use std::net::SocketAddr; - -use atm0s_sdn::NodeId; -use media_utils::F32; use proc_macro::{IntoVecU8, TryFromSliceU8}; use serde::{Deserialize, Serialize}; -use transport::MediaKind; - -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub enum MediaStreamIssueType { - Connectivity { mos: F32<2>, lost_percents: F32<2>, jitter_ms: F32<2>, rtt_ms: u32 }, -} - -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub enum MediaEndpointEvent { - Routing { - user_agent: String, - gateway_node_id: NodeId, - }, - RoutingError { - reason: String, - gateway_node_id: NodeId, - media_node_ids: Vec, - }, - Routed { - media_node_id: NodeId, - after_ms: u32, - }, - Connecting { - user_agent: String, - remote: Option, - }, - ConnectError { - remote: Option, - error_code: String, - error_message: String, - }, - Connected { - after_ms: u32, - remote: Option, - }, - Reconnecting { - reason: String, - }, - Reconnected { - remote: Option, - }, - Disconnected { - error: Option, - sent_bytes: u64, - received_bytes: u64, - duration_ms: u64, - rtt: F32<2>, - }, - SessionStats { - received_bytes: u64, - receive_limit_bitrate: u32, - sent_bytes: u64, - send_est_bitrate: u32, - rtt: u16, - }, -} - -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub enum MediaReceiveStreamEvent { - StreamStarted { - name: String, - kind: MediaKind, - remote_peer: String, - remote_stream: String, - }, - StreamIssue { - name: String, - kind: MediaKind, - remote_peer: String, - remote_stream: String, - issue: MediaStreamIssueType, - }, - StreamStats { - name: String, - kind: MediaKind, - limit_bitrate: u32, - received_bytes: u64, - freeze: bool, - mos: Option>, - rtt: Option, - jitter: Option>, - lost: Option>, - }, - StreamEnded { - name: String, - kind: MediaKind, - sent_bytes: u64, - freeze_count: u32, - duration_ms: u64, - mos: Option<(F32<2>, F32<2>, F32<2>)>, - rtt: Option<(F32<2>, F32<2>, F32<2>)>, - jitter: Option<(F32<2>, F32<2>, F32<2>)>, - lost: Option<(F32<2>, F32<2>, F32<2>)>, - }, -} - -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub enum MediaSendStreamEvent { - StreamStarted { - name: String, - kind: MediaKind, - meta: String, - scaling: String, - }, - StreamIssue { - name: String, - kind: MediaKind, - issue: MediaStreamIssueType, - }, - StreamStats { - name: String, - kind: MediaKind, - sent_bytes: u64, - freeze: bool, - mos: Option>, - rtt: Option, - jitter: Option>, - lost: Option>, - }, - StreamEnded { - name: String, - kind: MediaKind, - received_bytes: u64, - duration_ms: u64, - freeze_count: u32, - mos: Option<(F32<2>, F32<2>, F32<2>)>, - rtt: Option<(F32<2>, F32<2>, F32<2>)>, - jitter: Option<(F32<2>, F32<2>, F32<2>)>, - lost: Option<(F32<2>, F32<2>, F32<2>)>, - }, -} - -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, IntoVecU8, TryFromSliceU8)] -pub enum MediaEndpointLogRequest { - SessionEvent { - ip: String, - version: Option, - location: Option<(F32<2>, F32<2>)>, - token: Vec, - ts: u64, - session_uuid: u64, - event: MediaEndpointEvent, - }, - ReceiveStreamEvent { - token: Vec, - ts: u64, - session_uuid: u64, - event: MediaReceiveStreamEvent, - }, - SendStreamEvent { - token: Vec, - ts: u64, - session_uuid: u64, - event: MediaSendStreamEvent, - }, -} #[derive(PartialEq, Clone, Debug, Serialize, Deserialize, IntoVecU8, TryFromSliceU8)] pub struct MediaEndpointLogResponse {} diff --git a/packages/endpoint/Cargo.toml b/packages/endpoint/Cargo.toml index e4c27614..55cdb8ea 100644 --- a/packages/endpoint/Cargo.toml +++ b/packages/endpoint/Cargo.toml @@ -11,6 +11,7 @@ description = "Media Endpoint for atm0s-media-server" cluster = { package = "atm0s-media-server-cluster", path = "../cluster", version = "0.1.0" } transport = { package = "atm0s-media-server-transport", path = "../transport", version = "0.1.0" } media-utils = { package = "atm0s-media-server-utils", path = "../media-utils", version = "0.1.0" } +protocol = { package = "atm0s-media-server-protocol", path = "../protocol", version = "0.1.0" } audio-mixer = { package = "atm0s-media-server-audio-mixer", path = "../audio-mixer", version = "0.1.0" } log = { workspace = true } async-std = { workspace = true } diff --git a/packages/endpoint/src/endpoint_wrap/internal.rs b/packages/endpoint/src/endpoint_wrap/internal.rs index ae1a5ee5..1a5905f0 100644 --- a/packages/endpoint/src/endpoint_wrap/internal.rs +++ b/packages/endpoint/src/endpoint_wrap/internal.rs @@ -33,7 +33,7 @@ pub enum MediaEndpointInternalEvent { ConnectionError(TransportError), } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq)] pub enum MediaInternalAction { Internal(MediaEndpointInternalEvent), Endpoint(TransportOutgoingEvent), diff --git a/packages/endpoint/src/middleware.rs b/packages/endpoint/src/middleware.rs index efde21ea..a20d9f7b 100644 --- a/packages/endpoint/src/middleware.rs +++ b/packages/endpoint/src/middleware.rs @@ -9,7 +9,7 @@ use crate::{ pub mod logger; pub mod mix_minus; -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq)] pub enum MediaEndpointMiddlewareOutput { Endpoint(TransportOutgoingEvent), Cluster(ClusterEndpointOutgoingEvent), diff --git a/packages/endpoint/src/middleware/logger.rs b/packages/endpoint/src/middleware/logger.rs index 55231502..6dece4be 100644 --- a/packages/endpoint/src/middleware/logger.rs +++ b/packages/endpoint/src/middleware/logger.rs @@ -1,7 +1,9 @@ use std::collections::VecDeque; -use cluster::rpc::connector::{MediaEndpointEvent, MediaEndpointLogRequest}; -use media_utils::F32; +use protocol::media_event_logs::{ + session_event::{self, SessionConnectError, SessionConnected, SessionConnecting, SessionDisconnected, SessionReconnected, SessionReconnecting}, + F32p2, MediaEndpointLogEvent, MediaEndpointLogRequest, SessionEvent, +}; use transport::{TransportError, TransportIncomingEvent, TransportStateEvent}; use crate::{MediaEndpointMiddleware, MediaEndpointMiddlewareOutput}; @@ -19,17 +21,20 @@ impl MediaEndpointEventLogger { } } - fn build_event(&self, now_ms: u64, event: MediaEndpointEvent) -> MediaEndpointMiddlewareOutput { + fn build_event(&self, now_ms: u64, event: session_event::Event) -> MediaEndpointMiddlewareOutput { log::info!("sending event out to connector {:?}", event); - let event = MediaEndpointLogRequest::SessionEvent { - ip: "127.0.0.1".to_string(), //TODO - version: None, - location: None, - token: vec![], - ts: now_ms, - session_uuid: 0, //TODO - event, + let event: MediaEndpointLogRequest = MediaEndpointLogRequest { + event: Some(MediaEndpointLogEvent::SessionEvent(SessionEvent { + ip: "127.0.0.1".to_string(), //TODO + version: None, + location: None, + token: vec![], + ts: now_ms, + session_uuid: 0, //TODO + event: Some(event), + })), }; + MediaEndpointMiddlewareOutput::Cluster(cluster::ClusterEndpointOutgoingEvent::MediaEndpointLog(event)) } } @@ -39,10 +44,10 @@ impl MediaEndpointMiddleware for MediaEndpointEventLogger { self.started_ms = Some(now_ms); self.outputs.push_back(self.build_event( now_ms, - MediaEndpointEvent::Connecting { + session_event::Event::Connecting(SessionConnecting { user_agent: "TODO".to_string(), //TODO remote: None, //TODO - }, + }), )); } @@ -55,38 +60,38 @@ impl MediaEndpointMiddleware for MediaEndpointEventLogger { TransportStateEvent::Connected => { self.outputs.push_back(self.build_event( now_ms, - MediaEndpointEvent::Connected { + session_event::Event::Connected(SessionConnected { after_ms: (now_ms - self.started_ms.expect("Should has started")) as u32, remote: None, //TODO - }, + }), )); } TransportStateEvent::Reconnecting => { self.outputs.push_back(self.build_event( now_ms, - MediaEndpointEvent::Reconnecting { + session_event::Event::Reconnecting(SessionReconnecting { reason: "TODO".to_string(), //TODO - }, + }), )); } TransportStateEvent::Reconnected => { self.outputs.push_back(self.build_event( now_ms, - MediaEndpointEvent::Reconnected { + session_event::Event::Reconnected(SessionReconnected{ remote: None, //TODO - }, + }), )); } TransportStateEvent::Disconnected => { self.outputs.push_back(self.build_event( now_ms, - MediaEndpointEvent::Disconnected { + session_event::Event::Disconnected(SessionDisconnected { error: None, duration_ms: now_ms - self.started_ms.expect("Should has started"), - received_bytes: 0, //TODO - rtt: F32::new(0.0), //TODO - sent_bytes: 0, //TODO - }, + received_bytes: 0, //TODO + rtt: F32p2 { value: 0 }, //TODO + sent_bytes: 0, //TODO + }), )); } }, @@ -101,23 +106,23 @@ impl MediaEndpointMiddleware for MediaEndpointEventLogger { TransportError::ConnectError(_) => { self.outputs.push_back(self.build_event( now_ms, - MediaEndpointEvent::ConnectError { + session_event::Event::ConnectError(SessionConnectError { remote: None, //TODO error_code: "TODO".to_string(), //TODO error_message: "TODO".to_string(), //TODO - }, + }), )); } TransportError::ConnectionError(_) => { self.outputs.push_back(self.build_event( now_ms, - MediaEndpointEvent::Disconnected { + session_event::Event::Disconnected(SessionDisconnected { error: Some("TIMEOUT".to_string()), //TODO duration_ms: now_ms - self.started_ms.expect("Should has started"), - received_bytes: 0, //TODO - rtt: F32::new(0.0), //TODO - sent_bytes: 0, //TODO - }, + received_bytes: 0, //TODO + rtt: F32p2 { value: 0 }, //TODO + sent_bytes: 0, + }), )); } _ => {} @@ -137,3 +142,138 @@ impl MediaEndpointMiddleware for MediaEndpointEventLogger { fn before_drop(&mut self, _now_ms: u64) {} } +#[cfg(test)] +mod tests { + use super::*; + use protocol::media_event_logs::session_event::*; + + #[test] + fn test_on_transport_connected() { + let mut logger = MediaEndpointEventLogger::new(); + let event = TransportIncomingEvent::State(TransportStateEvent::Connected); + logger.on_start(0); + logger.on_transport(1000, &event); + assert_eq!( + logger.pop_action(0), + Some(MediaEndpointMiddlewareOutput::Cluster(cluster::ClusterEndpointOutgoingEvent::MediaEndpointLog( + MediaEndpointLogRequest { + event: Some(MediaEndpointLogEvent::SessionEvent(SessionEvent { + ip: "127.0.0.1".to_string(), + version: None, + location: None, + token: vec![], + ts: 1000, + session_uuid: 0, + event: Some(Event::Connected(SessionConnected { after_ms: 1000, remote: None })), + })), + } + ))) + ); + } + + #[test] + fn test_on_transport_reconnecting() { + let mut logger = MediaEndpointEventLogger::new(); + let event = TransportIncomingEvent::State(TransportStateEvent::Reconnecting); + logger.on_start(0); + logger.on_transport(1000, &event); + assert_eq!( + logger.pop_action(0), + Some(MediaEndpointMiddlewareOutput::Cluster(cluster::ClusterEndpointOutgoingEvent::MediaEndpointLog( + MediaEndpointLogRequest { + event: Some(MediaEndpointLogEvent::SessionEvent(SessionEvent { + ip: "127.0.0.1".to_string(), + version: None, + location: None, + token: vec![], + ts: 1000, + session_uuid: 0, + event: Some(Event::Reconnecting(SessionReconnecting { reason: "TODO".to_string() })), + })), + } + ))) + ); + } + + #[test] + fn test_on_transport_reconnected() { + let mut logger = MediaEndpointEventLogger::new(); + let event = TransportIncomingEvent::State(TransportStateEvent::Reconnected); + logger.on_start(0); + logger.on_transport(1000, &event); + assert_eq!( + logger.pop_action(0), + Some(MediaEndpointMiddlewareOutput::Cluster(cluster::ClusterEndpointOutgoingEvent::MediaEndpointLog( + MediaEndpointLogRequest { + event: Some(MediaEndpointLogEvent::SessionEvent(SessionEvent { + ip: "127.0.0.1".to_string(), + version: None, + location: None, + token: vec![], + ts: 1000, + session_uuid: 0, + event: Some(Event::Reconnected(SessionReconnected { remote: None })), + })), + } + ))) + ); + } + + #[test] + fn test_on_transport_disconnected() { + let mut logger = MediaEndpointEventLogger::new(); + let event = TransportIncomingEvent::State(TransportStateEvent::Disconnected); + logger.on_start(0); + logger.on_transport(1000, &event); + assert_eq!( + logger.pop_action(0), + Some(MediaEndpointMiddlewareOutput::Cluster(cluster::ClusterEndpointOutgoingEvent::MediaEndpointLog( + MediaEndpointLogRequest { + event: Some(MediaEndpointLogEvent::SessionEvent(SessionEvent { + ip: "127.0.0.1".to_string(), + version: None, + location: None, + token: vec![], + ts: 1000, + session_uuid: 0, + event: Some(Event::Disconnected(SessionDisconnected { + error: None, + duration_ms: 1000, + received_bytes: 0, + rtt: F32p2 { value: 0 }, + sent_bytes: 0, + })), + })), + } + ))) + ); + } + + #[test] + fn test_on_transport_error_connect_error() { + let mut logger = MediaEndpointEventLogger::new(); + let error = TransportError::ConnectError(transport::ConnectErrorReason::Timeout); + logger.on_start(0); + logger.on_transport_error(1000, &error); + assert_eq!( + logger.pop_action(0), + Some(MediaEndpointMiddlewareOutput::Cluster(cluster::ClusterEndpointOutgoingEvent::MediaEndpointLog( + MediaEndpointLogRequest { + event: Some(MediaEndpointLogEvent::SessionEvent(SessionEvent { + ip: "127.0.0.1".to_string(), + version: None, + location: None, + token: vec![], + ts: 1000, + session_uuid: 0, + event: Some(Event::ConnectError(SessionConnectError { + remote: None, + error_code: "TODO".to_string(), + error_message: "TODO".to_string(), + })), + })), + } + ))) + ); + } +} diff --git a/packages/protocol/Cargo.toml b/packages/protocol/Cargo.toml new file mode 100644 index 00000000..f801dda2 --- /dev/null +++ b/packages/protocol/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "atm0s-media-server-protocol" +version = "0.1.0" +edition = "2021" +description = "Cluster Protobuf definitions for atm0s-media-server" +license = "MIT" +build = "build.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +prost = { workspace = true } + +[build-dependencies] +prost-build = "0.12" diff --git a/packages/protocol/build.rs b/packages/protocol/build.rs new file mode 100644 index 00000000..cb52bcb4 --- /dev/null +++ b/packages/protocol/build.rs @@ -0,0 +1,10 @@ +extern crate prost_build; + +fn main() { + let mut config = prost_build::Config::new(); + // config.type_attribute(".", ""); + // config.type_attribute(".receive_stream_event", "#[derive(Eq)]"); + // config.type_attribute(".send_stream_event", "#[derive(Eq)]"); + config.compile_protos(&["src/atm0s.proto"], + &["src/"]).unwrap(); +} diff --git a/packages/protocol/src/atm0s.proto b/packages/protocol/src/atm0s.proto new file mode 100644 index 00000000..128d9d52 --- /dev/null +++ b/packages/protocol/src/atm0s.proto @@ -0,0 +1,5 @@ +syntax = "proto3"; + +import "media_endpoint_log.proto"; + +package atm0s; diff --git a/packages/protocol/src/lib.rs b/packages/protocol/src/lib.rs new file mode 100644 index 00000000..764c70aa --- /dev/null +++ b/packages/protocol/src/lib.rs @@ -0,0 +1,35 @@ +use std::marker::PhantomData; + +use prost::Message; + +pub mod media_event_logs { + use std::vec; + + use prost::Message; + + pub type MediaEndpointLogEvent = media_endpoint_log_request::Event; + pub type MediaSessionEvent = session_event::Event; + + include!(concat!(env!("OUT_DIR"), "/atm0s.media_endpoint_log.rs")); + impl From for Vec { + fn from(val: MediaEndpointLogRequest) -> Self { + val.encode_to_vec() + } + } + + impl TryFrom<&[u8]> for MediaEndpointLogRequest{ + type Error = prost::DecodeError; + + fn try_from(value: &[u8]) -> Result { + Self::decode(value) + } + } + + impl From for Vec { + fn from(val: MediaEndpointLogEvent) -> Self { + let mut buf = vec![]; + val.encode(&mut buf); + buf + } + } +} diff --git a/packages/protocol/src/media_endpoint_log.proto b/packages/protocol/src/media_endpoint_log.proto new file mode 100644 index 00000000..19289712 --- /dev/null +++ b/packages/protocol/src/media_endpoint_log.proto @@ -0,0 +1,221 @@ +syntax = "proto2"; + +package atm0s.media_endpoint_log; + +// F32p2 is used to represent a floating point number with the precision of 2. +message F32p2 { + required uint32 value = 1; +} + +message Stats { + required F32p2 min = 1; + required F32p2 avg = 2; + required F32p2 max = 3; +} + +message SessionEvent { + message Location { + required F32p2 lat = 1; + required F32p2 lng = 2; + } + message SessionRouting { + required string user_agent = 1; + required uint32 gateway_node_id = 2; + } + + message SessionRoutingError { + required string reason = 1; + required uint32 gateway_node_id = 2; + repeated uint32 media_node_ids = 3; + } + + message SessionRouted { + required uint32 after_ms = 1; + required uint32 media_node_id = 2; + } + + message SessionConnecting { + required string user_agent = 1; + optional string remote = 2; + } + + message SessionConnectError { + optional string remote = 1; + required string error_code = 2; + required string error_message = 3; + } + + message SessionConnected { + optional string remote = 1; + required uint32 after_ms = 2; + } + + message SessionReconnecting { + required string reason = 1; + } + + message SessionReconnected { + optional string remote = 1; + } + + message SessionDisconnected { + optional string error = 1; + required uint64 sent_bytes = 2; + required uint64 received_bytes = 3; + required uint64 duration_ms = 4; + required F32p2 rtt = 5; + } + + message SessionStats { + required uint64 received_bytes = 1; + required uint32 receive_limit_bitrate = 2; + required uint64 sent_bytes = 3; + required uint32 send_est_bitrate = 4; + required uint32 rtt = 5; + } + + required string ip = 1; + optional string version = 2; + optional Location location = 3; + required bytes token = 4; + required uint64 ts = 5; + required uint64 session_uuid = 6; + oneof event { + SessionRouting routing = 7; + SessionRoutingError routing_error = 8; + SessionRouted routed = 9; + SessionConnecting connecting = 10; + SessionConnectError connect_error = 11; + SessionConnected connected = 12; + SessionReconnecting reconnecting = 13; + SessionReconnected reconnected = 14; + SessionDisconnected disconnected = 15; + SessionStats stats = 16; + } +} + +message MediaStreamIssueConnectivity { + required F32p2 mos = 1; + required F32p2 lost_percents = 2; + required F32p2 jitter_ms = 3; + required uint32 rtt_ms = 4; +} + +enum MediaKind { + AUDIO = 0; + VIDEO = 1; +} + +message ReceiveStreamEvent { + message ReceivedStreamStarted { + required string name = 1; + required MediaKind kind = 2; + required string remote_peer = 3; + required string remote_stream = 4; + } + + message ReceivedStreamIssue { + required string name = 1; + required MediaKind kind = 2; + required string remote_peer = 3; + required string remote_stream = 4; + oneof issue { + MediaStreamIssueConnectivity connectivity = 5; + } + } + + message ReceivedStreamStats { + required string name = 1; + required MediaKind kind = 2; + required uint32 limit_bitrate = 3; + required uint64 received_bytes = 4; + required bool freeze = 5; + optional F32p2 mos = 6; + optional F32p2 lost = 7; + optional F32p2 jitter = 8; + optional uint32 rtt = 9; + } + + message ReceivedStreamEnded { + required string name = 1; + required MediaKind kind = 2; + required uint64 sent_bytes = 3; + required uint32 freeze_count = 4; + required uint64 duration_ms = 5; + optional Stats mos = 6; + optional Stats lost = 7; + optional Stats jitter = 8; + optional Stats rtt = 9; + } + + required bytes token = 1; + required uint64 ts = 2; + required uint64 session_uuid = 3; + oneof event { + ReceivedStreamStarted started = 4; + ReceivedStreamIssue issue = 5; + ReceivedStreamStats stats = 6; + ReceivedStreamEnded ended = 7; + } +} + +message SendStreamEvent { + message SendStreamStarted { + required string name = 1; + required MediaKind kind = 2; + required string remote_peer = 3; + required string remote_stream = 4; + } + + message SendStreamIssue { + required string name = 1; + required MediaKind kind = 2; + required string remote_peer = 3; + required string remote_stream = 4; + oneof issue { + MediaStreamIssueConnectivity connectivity = 5; + } + } + + message SendStreamStats { + required string name = 1; + required MediaKind kind = 2; + required uint64 sent_bytes = 3; + required bool freeze = 4; + optional F32p2 mos = 5; + optional F32p2 lost = 6; + optional F32p2 jitter = 7; + optional uint32 rtt = 8; + } + + message SendStreamEnded { + required string name = 1; + required MediaKind kind = 2; + required uint64 received_bytes = 3; + required uint32 freeze_count = 4; + required uint64 duration_ms = 5; + optional Stats mos = 6; + optional Stats lost = 7; + optional Stats jitter = 8; + optional Stats rtt = 9; + } + + required bytes token = 1; + required uint64 ts = 2; + required uint64 session_uuid = 3; + oneof event { + SendStreamStarted started = 4; + SendStreamIssue issue = 5; + SendStreamStats stats = 6; + SendStreamEnded ended = 7; + } +} + +message MediaEndpointLogRequest { + oneof event { + SessionEvent session_event = 1; + ReceiveStreamEvent receive_stream_event = 2; + SendStreamEvent send_stream_event = 3; + } +} + diff --git a/servers/media-server/Cargo.toml b/servers/media-server/Cargo.toml index 48923dcb..2f0fdf4e 100644 --- a/servers/media-server/Cargo.toml +++ b/servers/media-server/Cargo.toml @@ -10,11 +10,12 @@ description = "Decentralized media-server with WebRTC/RTMP/Whip/Whep support" [dependencies] clap = { version = "4.4.11", features = ["derive", "env"] } endpoint = { package = "atm0s-media-server-endpoint", path = "../../packages/endpoint", version = "0.1.0" } -transport = { package = "atm0s-media-server-transport", path = "../../packages/transport", version = "0.1.0"} +transport = { package = "atm0s-media-server-transport", path = "../../packages/transport", version = "0.1.0" } cluster = { package = "atm0s-media-server-cluster", path = "../../packages/cluster", version = "0.1.0" } +protocol = { package = "atm0s-media-server-protocol", path = "../../packages/protocol", version = "0.1.0" } media-utils = { package = "atm0s-media-server-utils", path = "../../packages/media-utils", version = "0.1.0" } transport-webrtc = { package = "atm0s-media-server-transport-webrtc", path = "../../transports/webrtc", version = "0.1.0", optional = true } -transport-rtmp = { package = "atm0s-media-server-transport-rtmp", path = "../../transports/rtmp", version = "0.1.0", optional = true } +transport-rtmp = { package = "atm0s-media-server-transport-rtmp", path = "../../transports/rtmp", version = "0.1.0", optional = true } transport-sip = { package = "atm0s-media-server-transport-sip", path = "../../transports/sip", version = "0.1.0", optional = true } async-std = { workspace = true } async-trait = { workspace = true } @@ -24,11 +25,14 @@ log = { workspace = true } poem = { version = "1.3", features = ["embed"] } poem-openapi = { version = "3.0", features = ["swagger-ui", "static-files"] } serde = { workspace = true } +serde_json = { workspace = true, optional = true } tracing-subscriber = { version = "0.3.18", features = ["env-filter", "std"] } rust-embed = { version = "8.1", optional = true } rsip = { version = "0.4.0", optional = true } metrics-dashboard = { version = "0.1.3", features = ["system"] } metrics = "0.21.1" +nats = { version = "0.24.1", optional = true } +prost = { workspace = true, optional = true } [features] default = ["embed-samples", "gateway", "webrtc", "rtmp", "sip", "connector", "token_generate"] @@ -37,5 +41,5 @@ webrtc = ["transport-webrtc"] rtmp = ["transport-rtmp"] sip = ["rsip", "transport-sip"] gateway = [] -connector = [] +connector = ["nats", "prost"] token_generate = [] diff --git a/servers/media-server/src/main.rs b/servers/media-server/src/main.rs index 40771e5d..4ed6ea12 100644 --- a/servers/media-server/src/main.rs +++ b/servers/media-server/src/main.rs @@ -132,8 +132,11 @@ async fn main() { } #[cfg(feature = "connector")] Servers::Connector(opts) => { + use server::MediaServerContext; + let token = Arc::new(cluster::implement::jwt_static::JwtStaticToken::new(&args.token)); + let ctx = MediaServerContext::new(args.node_id, opts.max_conn, Arc::new(SystemTimer()), token.clone(), token); let (cluster, rpc_endpoint) = ServerSdn::new(args.node_id, args.sdn_port, CONNECTOR_SERVICE, config).await; - if let Err(e) = run_connector_server(args.http_port, opts, cluster, rpc_endpoint).await { + if let Err(e) = run_connector_server(args.http_port, opts, ctx, cluster, rpc_endpoint).await { log::error!("[ConnectorServer] error {}", e); } } diff --git a/servers/media-server/src/server/connector.rs b/servers/media-server/src/server/connector.rs index c10b8584..de21647d 100644 --- a/servers/media-server/src/server/connector.rs +++ b/servers/media-server/src/server/connector.rs @@ -7,19 +7,39 @@ use futures::{select, FutureExt}; use metrics_dashboard::build_dashboard_route; use poem::Route; use poem_openapi::OpenApiService; +use protocol::media_event_logs::MediaEndpointLogRequest; use crate::rpc::http::HttpRpcServer; mod rpc; +mod transports; -use self::rpc::{cluster::ConnectorClusterRpc, http::ConnectorHttpApis, RpcEvent}; +use self::{ + rpc::{cluster::ConnectorClusterRpc, http::ConnectorHttpApis, RpcEvent, InternalControl}, + transports::nats::NatsTransporter, + transports::{parse_uri, ConnectorTransporter}, +}; + +use super::MediaServerContext; /// Media Server Webrtc #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] -pub struct ConnectorArgs {} +pub struct ConnectorArgs { + /// Message Queue URI in the form of `amqp://user:pass@host:port/vhost` + #[arg(env, long, default_value = "nats://localhost:4222")] + mq_uri: String, + + /// MQ Channel + #[arg(env, long, default_value = "atm0s/event_log")] + mq_channel: String, + + /// Max conn + #[arg(env, long, default_value_t = 100)] + pub max_conn: u64, +} -pub async fn run_connector_server(http_port: u16, _opts: ConnectorArgs, _cluster: C, rpc_endpoint: RPC) -> Result<(), &'static str> +pub async fn run_connector_server(http_port: u16, _opts: ConnectorArgs, ctx: MediaServerContext, _cluster: C, rpc_endpoint: RPC) -> Result<(), &'static str> where C: Cluster + Send + 'static, CR: ClusterEndpoint + Send + 'static, @@ -29,8 +49,28 @@ where { let mut rpc_endpoint = ConnectorClusterRpc::new(rpc_endpoint); let mut http_server: HttpRpcServer = crate::rpc::http::HttpRpcServer::new(http_port); + let (protocol, _) = parse_uri(&_opts.mq_uri).map_err(|e| { + log::error!("Error parsing MQ URI: {:?}", e); + "Error parsing MQ URI" + })?; + let transporter: Result>, String> = match protocol.as_str() { + "nats" => { + let nats = NatsTransporter::new(_opts.mq_uri.clone(), _opts.mq_channel.clone()).await; + match nats { + Ok(nats) => Ok(Box::new(nats)), + Err(e) => { + log::error!("Error creating Nats transporter: {:?}", e); + return Err("Error creating Nats transporter"); + } + } + } + _ => { + log::error!("Unsupported transporter"); + return Err("Unsupported transporter"); + } + }; - let api_service = OpenApiService::new(ConnectorHttpApis, "Connector Server", "1.0.0").server("http://localhost:3000"); + let api_service = OpenApiService::new(ConnectorHttpApis, "Connector Server", "1.0.0").server(format!("http://localhost:{}", http_port)); let ui = api_service.swagger_ui(); let spec = api_service.spec(); @@ -40,7 +80,7 @@ where .nest("/ui/", ui) .at("/spec/", poem::endpoint::make_sync(move |_| spec.clone())); - http_server.start(route, ()).await; + http_server.start(route, ctx).await; loop { let rpc = select! { @@ -55,7 +95,13 @@ where match rpc { RpcEvent::MediaEndpointLog(req) => { log::info!("On media endpoint log {:?}", req.param()); - //TODO emit event to external queue: NATS, Kafka, etc + if let Ok(ref transport) = transporter { + let data = req.param(); + + if let Err(e) = transport.send(data).await { + log::error!("Error sending message: {:?}", e); + } + } req.answer(Ok(MediaEndpointLogResponse {})); } } diff --git a/servers/media-server/src/server/connector/rpc.rs b/servers/media-server/src/server/connector/rpc.rs index a4e4b071..ebe6732e 100644 --- a/servers/media-server/src/server/connector/rpc.rs +++ b/servers/media-server/src/server/connector/rpc.rs @@ -1,9 +1,11 @@ -use ::cluster::rpc::connector::{MediaEndpointLogRequest, MediaEndpointLogResponse}; +use ::cluster::rpc::connector::MediaEndpointLogResponse; use ::cluster::rpc::RpcReqRes; +use protocol::media_event_logs::MediaEndpointLogRequest; pub mod cluster; pub mod http; +pub enum InternalControl {} pub enum RpcEvent { MediaEndpointLog(Box>), } diff --git a/servers/media-server/src/server/connector/transports/mod.rs b/servers/media-server/src/server/connector/transports/mod.rs new file mode 100644 index 00000000..48b82c10 --- /dev/null +++ b/servers/media-server/src/server/connector/transports/mod.rs @@ -0,0 +1,30 @@ +use async_trait::async_trait; +use prost::Message; + +pub mod nats; + +#[async_trait] +pub trait ConnectorTransporter: Send + Sync { + async fn send(&self, data: &M) -> Result<(), String>; + async fn close(&mut self) -> Result<(), String>; +} + +pub fn parse_uri(uri: &str) -> Result<(String, String), String> { + let mut parts = uri.splitn(2, "://"); + let transport = parts.next().ok_or("Invalid URI")?; + let uri = parts.next().ok_or("Invalid URI")?; + Ok((transport.to_string(), uri.to_string())) +} + +#[cfg(test)] +mod test { + #[test] + fn test_parse_uri() { + let uri = "nats://localhost:4222"; + + let parsed = super::parse_uri(&uri); + + assert!(parsed.is_ok()); + assert_eq!(parsed.unwrap(), ("nats".to_string(), "localhost:4222".to_string())); + } +} diff --git a/servers/media-server/src/server/connector/transports/nats.rs b/servers/media-server/src/server/connector/transports/nats.rs new file mode 100644 index 00000000..26f7e98f --- /dev/null +++ b/servers/media-server/src/server/connector/transports/nats.rs @@ -0,0 +1,49 @@ +use std::marker::PhantomData; + +use async_trait::async_trait; +use prost::Message; + +use super::ConnectorTransporter; + +pub struct NatsTransporter { + pub conn: nats::asynk::Connection, + pub subject: String, + pub sub: Option, + _tmp: PhantomData, +} + +impl NatsTransporter { + pub async fn new(uri: String, subject: String) -> Result { + let res = nats::asynk::connect(&uri).await; + + let conn = match res { + Ok(conn) => conn, + Err(e) => { + return Err(e.to_string()); + } + }; + + Ok(Self { + conn, + subject, + sub: None, + _tmp: Default::default(), + }) + } +} + +#[async_trait] +impl ConnectorTransporter for NatsTransporter { + async fn send(&self, data: &M) -> Result<(), String> { + let data: Vec = data.encode_to_vec(); + self.conn.publish(&self.subject, data).await.map_err(|e| e.to_string())?; + return Ok(()); + } + + async fn close(&mut self) -> Result<(), String> { + if let Some(sub) = self.sub.take() { + let _ = sub.unsubscribe().await.map_err(|e: std::io::Error| e.to_string())?; + } + Ok(()) + } +} diff --git a/servers/media-server/src/server/gateway/webrtc_route.rs b/servers/media-server/src/server/gateway/webrtc_route.rs index 220dca7f..61046242 100644 --- a/servers/media-server/src/server/gateway/webrtc_route.rs +++ b/servers/media-server/src/server/gateway/webrtc_route.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use cluster::{ implement::NodeId, rpc::{ - connector::{MediaEndpointEvent, MediaEndpointLogRequest, MediaEndpointLogResponse}, + connector::MediaEndpointLogResponse, gateway::{NodeHealthcheckRequest, NodeHealthcheckResponse}, RpcEmitter, RpcReqRes, RPC_MEDIA_ENDPOINT_LOG, RPC_NODE_HEALTHCHECK, }, @@ -12,6 +12,10 @@ use cluster::{ use futures::FutureExt as _; use media_utils::{ErrorDebugger, Timer}; use metrics::increment_counter; +use protocol::media_event_logs::{ + session_event::{SessionRouted, SessionRouting, SessionRoutingError}, + MediaEndpointLogEvent, SessionEvent, MediaSessionEvent, +}; use crate::server::gateway::{GATEWAY_SESSIONS_CONNECT_COUNT, GATEWAY_SESSIONS_CONNECT_ERROR}; @@ -51,7 +55,7 @@ async fn select_node(emitter: &EMITTER, no } // TODO running in queue and retry if failed. It should retry when connector service not accept -fn emit_endpoint_event(emitter: &EMITTER, timer: &Arc, session_uuid: u64, ip: &str, version: &Option, event: MediaEndpointEvent) { +fn emit_endpoint_event(emitter: &EMITTER, timer: &Arc, session_uuid: u64, ip: &str, version: &Option, event: MediaSessionEvent) { let emitter = emitter.clone(); let ts = timer.now_ms(); let ip = ip.to_string(); @@ -62,15 +66,15 @@ fn emit_endpoint_event(emitter: &EMITTER, CONNECTOR_SERVICE, None, RPC_MEDIA_ENDPOINT_LOG, - MediaEndpointLogRequest::SessionEvent { + MediaEndpointLogEvent::SessionEvent(SessionEvent { ip, location: None, version, token: vec![], ts, session_uuid, - event, - }, + event: Some(event), + }), 1000, ) .await @@ -97,10 +101,10 @@ pub fn route_to_node( { increment_counter!(GATEWAY_SESSIONS_CONNECT_COUNT); let started_ms = timer.now_ms(); - let event = MediaEndpointEvent::Routing { + let event = MediaSessionEvent::Routing(SessionRouting { user_agent: user_agent.to_string(), gateway_node_id, - }; + }); emit_endpoint_event(&emitter, &timer, session_uuid, ip, version, event); let nodes = gateway_logic.best_nodes(service, 60, 80, 3); @@ -117,16 +121,16 @@ pub fn route_to_node( log::info!("[Gateway] webrtc connect res from media-server {:?}", res.as_ref().map(|_| ())); let event = if res.is_err() { increment_counter!(GATEWAY_SESSIONS_CONNECT_ERROR); - MediaEndpointEvent::RoutingError { + MediaSessionEvent::RoutingError(SessionRoutingError { reason: "NODE_ANSWER_ERROR".to_string(), gateway_node_id, media_node_ids: vec![node_id], - } + }) } else { - MediaEndpointEvent::Routed { + MediaSessionEvent::Routed(SessionRouted { media_node_id: node_id, after_ms: (timer.now_ms() - started_ms) as u32, - } + }) }; emit_endpoint_event(&emitter, &timer, session_uuid, &ip, &version, event); @@ -134,22 +138,23 @@ pub fn route_to_node( } else { log::warn!("[Gateway] webrtc connect but ping nodes {:?} timeout", nodes); increment_counter!(GATEWAY_SESSIONS_CONNECT_ERROR); - let event = MediaEndpointEvent::RoutingError { + let event = MediaSessionEvent::RoutingError(SessionRoutingError { reason: "NODE_PING_TIMEOUT".to_string(), gateway_node_id, media_node_ids: nodes, - }; + }); emit_endpoint_event(&emitter, &timer, session_uuid, &ip, &version, event); req.answer(Err("NODE_PING_TIMEOUT")); } }); } else { increment_counter!(GATEWAY_SESSIONS_CONNECT_ERROR); - let event = MediaEndpointEvent::RoutingError { + let event = MediaSessionEvent::RoutingError(SessionRoutingError { reason: "NODE_POOL_EMPTY".to_string(), gateway_node_id, media_node_ids: vec![], - }; + }); + emit_endpoint_event(&emitter, &timer, session_uuid, ip, version, event); log::warn!("[Gateway] webrtc connect but media-server pool empty");