Skip to content

Commit

Permalink
Native interface now allows multiple connections to be dialed and acc…
Browse files Browse the repository at this point in the history
…epted

It achieves this by making clones of peer_con and handle, which are
Arc<Mutex>>.

This interface is no longer compatible with the wasm one, however.

This addresses wngr#4
partially.
  • Loading branch information
eras committed Dec 29, 2022
1 parent 8625971 commit 1083bb6
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions native/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub struct DataStream {
/// Reference to the PeerConnection to keep around
peer_con: Option<Arc<Mutex<Box<RtcPeerConnection<ConnInternal>>>>>,
/// Reference to the outbound piper
handle: Option<JoinHandle<()>>,
handle: Option<Arc<Mutex<JoinHandle<()>>>>,
}

impl DataStream {
Expand Down Expand Up @@ -191,7 +191,7 @@ impl AsyncWrite for DataStream {
pub struct PeerConnection {
peer_con: Arc<Mutex<Box<RtcPeerConnection<ConnInternal>>>>,
rx_incoming: mpsc::Receiver<DataStream>,
handle: JoinHandle<()>,
handle: Arc<Mutex<JoinHandle<()>>>,
}

impl PeerConnection {
Expand All @@ -211,7 +211,7 @@ impl PeerConnection {
},
)?));
let pc = peer_con.clone();
let handle = tokio::spawn(async move {
let handle = Arc::new(Mutex::new(tokio::spawn(async move {
while let Some(m) = sig_rx.next().await {
if let Err(err) = match m {
Message::RemoteDescription(i) => pc.lock().set_remote_description(&i),
Expand All @@ -220,7 +220,7 @@ impl PeerConnection {
error!(?err, "Error interacting with RtcPeerConnection");
}
}
});
})));
Ok(Self {
peer_con,
rx_incoming,
Expand All @@ -229,30 +229,30 @@ impl PeerConnection {
}

/// Wait for an inbound connection.
pub async fn accept(mut self) -> anyhow::Result<DataStream> {
pub async fn accept(&mut self) -> anyhow::Result<DataStream> {
let mut s = self.rx_incoming.next().await.context("Tx dropped")?;
s.handle = Some(self.handle);
s.peer_con = Some(self.peer_con);
s.handle = Some(self.handle.clone());
s.peer_con = Some(self.peer_con.clone());
Ok(s)
}

/// Initiate an outbound dialing.
pub async fn dial(self, label: &str) -> anyhow::Result<DataStream> {
pub async fn dial(&self, label: &str) -> anyhow::Result<DataStream> {
let (mut ready, rx_inbound, chan) = DataChannel::new();
let dc = self.peer_con.lock().create_data_channel(label, chan)?;
ready.next().await.context("Tx dropped")??;
Ok(DataStream {
inner: dc,
rx_inbound,
buf_inbound: vec![],
handle: Some(self.handle),
peer_con: Some(self.peer_con),
handle: Some(self.handle.clone()),
peer_con: Some(self.peer_con.clone()),
})
}

/// Initiate an outbound dialing with extra options.
pub async fn dial_ex(
self,
&self,
label: &str,
dc_init: &DataChannelInit,
) -> anyhow::Result<DataStream> {
Expand All @@ -266,8 +266,8 @@ impl PeerConnection {
inner: dc,
rx_inbound,
buf_inbound: vec![],
handle: Some(self.handle),
peer_con: Some(self.peer_con),
handle: Some(self.handle.clone()),
peer_con: Some(self.peer_con.clone()),
})
}
}
Expand Down

0 comments on commit 1083bb6

Please sign in to comment.