diff --git a/crates/kitsune_p2p/proxy/src/inner_listen.rs b/crates/kitsune_p2p/proxy/src/inner_listen.rs index 6e6f76516a..9928187213 100644 --- a/crates/kitsune_p2p/proxy/src/inner_listen.rs +++ b/crates/kitsune_p2p/proxy/src/inner_listen.rs @@ -210,6 +210,8 @@ ghost_actor::ghost_chan! { futures::channel::mpsc::Receiver, ); + fn prune_bad_proxy_to(proxy_url: ProxyUrl) -> (); + fn register_proxy_to(proxy_url: ProxyUrl, base_url: url2::Url2) -> (); fn req_proxy(proxy_url: ProxyUrl) -> (); @@ -371,14 +373,38 @@ impl InternalHandler for InnerListen { // and the channel create will re-use that. // If it is not, it will try to create a new connection that may fail. let fut = match proxy_to { - None => self - .i_s - .create_low_level_channel(dest_proxy_url.as_base().clone()), + None => { + tracing::warn!("Dropping message for {}", dest_proxy_url.as_full_str()); + return Ok(async move { + write + .send(ProxyWire::failure(format!( + "Dropped message to {}", + dest_proxy_url.as_full_str() + ))) + .await + .map_err(TransportError::other)?; + Ok(()) + } + .boxed() + .into()); + } Some(proxy_to) => self.i_s.create_low_level_channel(proxy_to), }; + let i_s = self.i_s.clone(); Ok(async move { - let (mut fwd_write, fwd_read) = match fut.await { + let url = dest_proxy_url.clone(); + let res = async move { + let (mut fwd_write, fwd_read) = fut.await?; + fwd_write + .send(ProxyWire::chan_new(url.into())) + .await + .map_err(TransportError::other)?; + TransportResult::Ok((fwd_write, fwd_read)) + } + .await; + let (fwd_write, fwd_read) = match res { Err(e) => { + let _ = i_s.prune_bad_proxy_to(dest_proxy_url).await; write .send(ProxyWire::failure(format!("{:?}", e))) .await @@ -387,10 +413,6 @@ impl InternalHandler for InnerListen { } Ok(t) => t, }; - fwd_write - .send(ProxyWire::chan_new(dest_proxy_url.clone().into())) - .await - .map_err(TransportError::other)?; cross_join_channel_forward(fwd_write, read); cross_join_channel_forward(write, fwd_read); Ok(()) @@ -417,6 +439,11 @@ impl InternalHandler for InnerListen { .into()) } + fn handle_prune_bad_proxy_to(&mut self, proxy_url: ProxyUrl) -> InternalHandlerResult<()> { + self.proxy_list.remove(&proxy_url); + Ok(async move { Ok(()) }.boxed().into()) + } + #[tracing::instrument(skip(self))] fn handle_register_proxy_to( &mut self,