diff --git a/examples/a-chat/client.rs b/examples/a-chat/client.rs index 48634ba03..92501fc86 100644 --- a/examples/a-chat/client.rs +++ b/examples/a-chat/client.rs @@ -1,13 +1,14 @@ -use futures::select; -use futures::FutureExt; +use std::sync::Arc; use async_std::{ io::{stdin, BufReader}, net::{TcpStream, ToSocketAddrs}, prelude::*, task, + future::select, }; + type Result = std::result::Result>; pub(crate) fn main() -> Result<()> { @@ -15,31 +16,28 @@ pub(crate) fn main() -> Result<()> { } async fn try_main(addr: impl ToSocketAddrs) -> Result<()> { - let stream = TcpStream::connect(addr).await?; - let (reader, mut writer) = (&stream, &stream); - let reader = BufReader::new(reader); - let mut lines_from_server = futures::StreamExt::fuse(reader.lines()); - - let stdin = BufReader::new(stdin()); - let mut lines_from_stdin = futures::StreamExt::fuse(stdin.lines()); - loop { - select! { - line = lines_from_server.next().fuse() => match line { - Some(line) => { - let line = line?; - println!("{}", line); - }, - None => break, - }, - line = lines_from_stdin.next().fuse() => match line { - Some(line) => { - let line = line?; - writer.write_all(line.as_bytes()).await?; - writer.write_all(b"\n").await?; - } - None => break, - } - } - } - Ok(()) + let stream = Arc::new(TcpStream::connect(addr).await?); + let (reader, writer) = (stream.clone(), stream.clone()); + + let incoming = task::spawn(async move { + let mut messages = BufReader::new(&*reader).lines(); + while let Some(message) = messages.next().await { + let message = message?; + println!("{}", message); + } + Ok(()) + }); + + let outgoing = task::spawn(async move { + let mut stdin = BufReader::new(stdin()).lines(); + + while let Some(line) = stdin.next().await { + let line = line?; + let message = format!("{}\n", line); + (&*writer).write_all(message.as_bytes()).await?; + } + Ok(()) + }); + + select!(incoming, outgoing).await } diff --git a/examples/a-chat/server.rs b/examples/a-chat/server.rs index e049a490e..738bec31e 100644 --- a/examples/a-chat/server.rs +++ b/examples/a-chat/server.rs @@ -3,18 +3,16 @@ use std::{ sync::Arc, }; -use futures::{channel::mpsc, select, FutureExt, SinkExt}; - use async_std::{ io::BufReader, net::{TcpListener, TcpStream, ToSocketAddrs}, prelude::*, task, + sync::{channel, Sender, Receiver}, + stream, }; type Result = std::result::Result>; -type Sender = mpsc::UnboundedSender; -type Receiver = mpsc::UnboundedReceiver; #[derive(Debug)] enum Void {} @@ -26,7 +24,7 @@ pub(crate) fn main() -> Result<()> { async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { let listener = TcpListener::bind(addr).await?; - let (broker_sender, broker_receiver) = mpsc::unbounded(); + let (broker_sender, broker_receiver) = channel(10); let broker = task::spawn(broker_loop(broker_receiver)); let mut incoming = listener.incoming(); while let Some(stream) = incoming.next().await { @@ -39,7 +37,7 @@ async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { Ok(()) } -async fn connection_loop(mut broker: Sender, stream: TcpStream) -> Result<()> { +async fn connection_loop(broker: Sender, stream: TcpStream) -> Result<()> { let stream = Arc::new(stream); let reader = BufReader::new(&*stream); let mut lines = reader.lines(); @@ -48,15 +46,14 @@ async fn connection_loop(mut broker: Sender, stream: TcpStream) -> Result None => return Err("peer disconnected immediately".into()), Some(line) => line?, }; - let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::(); + let (_shutdown_sender, shutdown_receiver) = channel::(0); broker .send(Event::NewPeer { name: name.clone(), stream: Arc::clone(&stream), shutdown: shutdown_receiver, }) - .await - .unwrap(); + .await; while let Some(line) = lines.next().await { let line = line?; @@ -76,28 +73,36 @@ async fn connection_loop(mut broker: Sender, stream: TcpStream) -> Result to: dest, msg, }) - .await - .unwrap(); + .await; } Ok(()) } +#[derive(Debug)] +enum ConnectionWriterEvent { + Message(String), + Shutdown +} + async fn connection_writer_loop( messages: &mut Receiver, stream: Arc, - mut shutdown: Receiver, + shutdown: Receiver, ) -> Result<()> { let mut stream = &*stream; - loop { - select! { - msg = messages.next().fuse() => match msg { - Some(msg) => stream.write_all(msg.as_bytes()).await?, - None => break, - }, - void = shutdown.next().fuse() => match void { - Some(void) => match void {}, - None => break, + let messages = messages.map(ConnectionWriterEvent::Message); + let shutdown = shutdown.map(|_| ConnectionWriterEvent::Shutdown).chain(stream::once(ConnectionWriterEvent::Shutdown)); + + let mut events = shutdown.merge(messages); + + while let Some(event) = events.next().await { + match event { + ConnectionWriterEvent::Message(msg) => { + stream.write_all(msg.as_bytes()).await?; + } + ConnectionWriterEvent::Shutdown => { + break } } } @@ -118,58 +123,61 @@ enum Event { }, } -async fn broker_loop(mut events: Receiver) { - let (disconnect_sender, mut disconnect_receiver) = - mpsc::unbounded::<(String, Receiver)>(); +#[derive(Debug)] +enum BrokerEvent { + ClientEvent(Event), + Disconnection((String, Receiver)), + Shutdown, +} + +async fn broker_loop(events: Receiver) { + let (disconnect_sender, disconnect_receiver) = channel(10); + let mut peers: HashMap> = HashMap::new(); + let disconnect_receiver = disconnect_receiver.map(BrokerEvent::Disconnection); + let events = events.map(BrokerEvent::ClientEvent).chain(stream::once(BrokerEvent::Shutdown)); - loop { - let event = select! { - event = events.next().fuse() => match event { - None => break, - Some(event) => event, - }, - disconnect = disconnect_receiver.next().fuse() => { - let (name, _pending_messages) = disconnect.unwrap(); - assert!(peers.remove(&name).is_some()); - continue; - }, - }; + let mut stream = disconnect_receiver.merge(events); + + while let Some(event) = stream.next().await { match event { - Event::Message { from, to, msg } => { + BrokerEvent::ClientEvent(Event::Message { from, to, msg }) => { for addr in to { if let Some(peer) = peers.get_mut(&addr) { let msg = format!("from {}: {}\n", from, msg); - peer.send(msg).await.unwrap(); + peer.send(msg).await; } } } - Event::NewPeer { + BrokerEvent::ClientEvent(Event::NewPeer { name, stream, shutdown, - } => match peers.entry(name.clone()) { + }) => match peers.entry(name.clone()) { Entry::Occupied(..) => (), Entry::Vacant(entry) => { - let (client_sender, mut client_receiver) = mpsc::unbounded(); + let (client_sender, mut client_receiver) = channel(10); entry.insert(client_sender); - let mut disconnect_sender = disconnect_sender.clone(); + let disconnect_sender = disconnect_sender.clone(); spawn_and_log_error(async move { let res = connection_writer_loop(&mut client_receiver, stream, shutdown).await; disconnect_sender .send((name, client_receiver)) - .await - .unwrap(); + .await; res }); } - }, + } + BrokerEvent::Disconnection((name, _pending_messages)) => { + assert!(peers.remove(&name).is_some()); + } + BrokerEvent::Shutdown => break, } } drop(peers); drop(disconnect_sender); - while let Some((_name, _pending_messages)) = disconnect_receiver.next().await {} + while let Some(BrokerEvent::Disconnection((_name, _pending_messages))) = stream.next().await {} } fn spawn_and_log_error(fut: F) -> task::JoinHandle<()>