Skip to content

Commit

Permalink
Add rust module support for active speaker observer
Browse files Browse the repository at this point in the history
  • Loading branch information
ggarber committed Jul 15, 2021
1 parent ceb78dc commit 4c608ca
Show file tree
Hide file tree
Showing 9 changed files with 784 additions and 1 deletion.
6 changes: 6 additions & 0 deletions rust/src/lib.rs
Expand Up @@ -68,6 +68,12 @@ pub mod audio_level_observer {
pub use crate::router::audio_level_observer::*;
}

pub mod active_speaker_observer {
//! An active speaker observer monitors the speaking activity of the selected audio producers.

pub use crate::router::active_speaker_observer::*;
}

pub mod consumer {
//! A consumer represents an audio or video source being forwarded from a mediasoup router to an
//! endpoint. It's created on top of a transport that defines how the media packets are carried.
Expand Down
23 changes: 23 additions & 0 deletions rust/src/messages.rs
@@ -1,4 +1,5 @@
use crate::audio_level_observer::AudioLevelObserverOptions;
use crate::active_speaker_observer::ActiveSpeakerObserverOptions;
use crate::consumer::{
ConsumerDump, ConsumerId, ConsumerLayers, ConsumerScore, ConsumerStats, ConsumerTraceEventType,
ConsumerType,
Expand Down Expand Up @@ -414,6 +415,28 @@ request_response!(
},
);

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct RouterCreateActiveSpeakerObserverData {
pub(crate) interval: u16,
}

impl RouterCreateActiveSpeakerObserverData {
pub(crate) fn from_options(active_speaker_observer_options: &ActiveSpeakerObserverOptions) -> Self {
Self {
interval: active_speaker_observer_options.interval,
}
}
}

request_response!(
"router.createActiveSpeakerObserver",
RouterCreateActiveSpeakerObserverRequest {
internal: RtpObserverInternal,
data: RouterCreateActiveSpeakerObserverData,
},
);

request_response!(
"transport.close",
TransportCloseRequest {
Expand Down
69 changes: 68 additions & 1 deletion rust/src/router.rs
Expand Up @@ -7,6 +7,7 @@
//! mediasoup routers, even in different physicals hosts).

pub(super) mod audio_level_observer;
pub(super) mod active_speaker_observer;
pub(super) mod consumer;
pub(super) mod data_consumer;
pub(super) mod data_producer;
Expand All @@ -21,6 +22,7 @@ pub(super) mod transport;
pub(super) mod webrtc_transport;

use crate::audio_level_observer::{AudioLevelObserver, AudioLevelObserverOptions};
use crate::active_speaker_observer::{ActiveSpeakerObserver, ActiveSpeakerObserverOptions};
use crate::consumer::{Consumer, ConsumerId, ConsumerOptions};
use crate::data_consumer::{DataConsumer, DataConsumerId, DataConsumerOptions};
use crate::data_producer::{
Expand All @@ -29,7 +31,9 @@ use crate::data_producer::{
use crate::data_structures::{AppData, TransportListenIp};
use crate::direct_transport::{DirectTransport, DirectTransportOptions};
use crate::messages::{
RouterCloseRequest, RouterCreateAudioLevelObserverData, RouterCreateAudioLevelObserverRequest,
RouterCloseRequest,
RouterCreateAudioLevelObserverData, RouterCreateAudioLevelObserverRequest,
RouterCreateActiveSpeakerObserverData, RouterCreateActiveSpeakerObserverRequest,
RouterCreateDirectTransportData, RouterCreateDirectTransportRequest,
RouterCreatePipeTransportData, RouterCreatePipeTransportRequest,
RouterCreatePlainTransportData, RouterCreatePlainTransportRequest,
Expand Down Expand Up @@ -306,6 +310,8 @@ impl<'a> Deref for NewTransport<'a> {
pub enum NewRtpObserver<'a> {
/// Audio level observer
AudioLevel(&'a AudioLevelObserver),
/// Active speaker observer
ActiveSpeaker(&'a ActiveSpeakerObserver),
}

impl<'a> Deref for NewRtpObserver<'a> {
Expand All @@ -314,6 +320,7 @@ impl<'a> Deref for NewRtpObserver<'a> {
fn deref(&self) -> &Self::Target {
match self {
Self::AudioLevel(observer) => *observer as &Self::Target,
Self::ActiveSpeaker(observer) => *observer as &Self::Target,
}
}
}
Expand Down Expand Up @@ -828,6 +835,66 @@ impl Router {
Ok(audio_level_observer)
}

/// Create an [`ActiveSpeakerObserver`].
///
/// Router will be kept alive as long as at least one observer instance is alive.
///
/// # Example
/// ```rust
/// use mediasoup::active_speaker_observer::ActiveSpeakerObserverOptions;
///
/// # async fn f(router: mediasoup::router::Router) -> Result<(), Box<dyn std::error::Error>> {
/// let observer = router
/// .create_active_speaker_observer({
/// let mut options = ActiveSpeakerObserverOptions::default();
/// options.interval = 300;
/// options
/// })
/// .await?;
/// # Ok(())
/// # }
/// ```
pub async fn create_active_speaker_observer(
&self,
active_speaker_observer_options: ActiveSpeakerObserverOptions,
) -> Result<ActiveSpeakerObserver, RequestError> {
debug!("create_active_speaker_observer()");

let rtp_observer_id = RtpObserverId::new();

let _buffer_guard = self
.inner
.channel
.buffer_messages_for(rtp_observer_id.into());

self.inner
.channel
.request(RouterCreateActiveSpeakerObserverRequest {
internal: RtpObserverInternal {
router_id: self.inner.id,
rtp_observer_id,
},
data: RouterCreateActiveSpeakerObserverData::from_options(
&active_speaker_observer_options,
),
})
.await?;

let active_speaker_observer = ActiveSpeakerObserver::new(
rtp_observer_id,
Arc::clone(&self.inner.executor),
self.inner.channel.clone(),
active_speaker_observer_options.app_data,
self.clone(),
);

self.inner.handlers.new_rtp_observer.call(|callback| {
callback(NewRtpObserver::ActiveSpeaker(&active_speaker_observer));
});

Ok(active_speaker_observer)
}

/// Pipes [`Producer`] with the given `producer_id` into another [`Router`] on same host.
///
/// # Example
Expand Down

0 comments on commit 4c608ca

Please sign in to comment.