Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sending from multiple sources to a memory server #382

Open
dignifiedquire opened this issue Oct 20, 2022 · 0 comments
Open

Sending from multiple sources to a memory server #382

dignifiedquire opened this issue Oct 20, 2022 · 0 comments

Comments

@dignifiedquire
Copy link

Unfortunately the current design of the inmemory channels doesn't allow to send from multiple sources, at least asfaict. I tried implementing a custom transport using flume channels, but failed to actually make them work, there seems to be sth strange happening once they are cloned.

Do you have any thoughts on what could be the issue or if this could be easily added to the existing implementation?

Current implementation

/// Returns two channel peers with buffer equal to `capacity`. Each [`Stream`] yields items sent
/// through the other's [`Sink`].
pub fn bounded<SinkItem, Item>(
    capacity: usize,
) -> (Channel<SinkItem, Item>, Channel<Item, SinkItem>)
where
    Item: Send + Sync + 'static,
    SinkItem: Send + Sync + 'static,
{
    let (tx1, rx2) = flume::bounded(capacity);
    let (tx2, rx1) = flume::bounded(capacity);
    (
        Channel {
            tx: tx1.into_sink(),
            rx: rx1.into_stream(),
        },
        Channel {
            tx: tx2.into_sink(),
            rx: rx2.into_stream(),
        },
    )
}

/// A bi-directional channel backed by a [`Sender`](flume::Sender) and [`Receiver`](flume::Receiver).
pub struct Channel<Item, SinkItem>
where
    Item: Send + Sync + 'static,
    SinkItem: Send + Sync + 'static,
{
    rx: flume::r#async::RecvStream<'static, Item>,
    tx: flume::r#async::SendSink<'static, SinkItem>,
}

impl<Item, SinkItem> Clone for Channel<Item, SinkItem>
where
    Item: Send + Sync + 'static,
    SinkItem: Send + Sync + 'static,
{
    fn clone(&self) -> Self {
        Self {
            rx: self.rx.clone(),
            tx: self.tx.clone(),
        }
    }
}

impl<Item, SinkItem> Stream for Channel<Item, SinkItem>
where
    Item: Send + Sync + 'static,
    SinkItem: Send + Sync + 'static,
{
    type Item = Result<Item, ChannelError>;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<Item, ChannelError>>> {
        Pin::new(&mut self.rx)
            .poll_next(cx)
            .map(|option| option.map(Ok))
    }
}

/// Errors that occur in the sending or receiving of messages over a channel.
#[derive(thiserror::Error, Debug)]
pub enum ChannelError {
    /// An error occurred sending over the channel.
    #[error("an error occurred sending over the channel")]
    Send(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
}

impl<Item, SinkItem> Sink<SinkItem> for Channel<Item, SinkItem>
where
    Item: Send + Sync + 'static,
    SinkItem: Send + Sync + 'static,
{
    type Error = ChannelError;

    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Pin::new(&mut self.tx)
            .poll_ready(cx)
            .map_err(|e| ChannelError::Send(Box::new(e)))
    }

    fn start_send(mut self: Pin<&mut Self>, item: SinkItem) -> Result<(), Self::Error> {
        Pin::new(&mut self.tx)
            .start_send(item)
            .map_err(|e| ChannelError::Send(Box::new(e)))
    }

    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Pin::new(&mut self.tx)
            .poll_flush(cx)
            .map_err(|e| ChannelError::Send(Box::new(e)))
    }

    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Pin::new(&mut self.tx)
            .poll_close(cx)
            .map_err(|e| ChannelError::Send(Box::new(e)))
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant