Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modernize a-chat #419

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 27 additions & 29 deletions examples/a-chat/client.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,43 @@
use futures::select;
use futures::FutureExt;
use std::sync::Arc;

use async_std::{
io::{stdin, BufReader},
net::{TcpStream, ToSocketAddrs},
prelude::*,
task,
future::select,
};


type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

pub(crate) fn main() -> Result<()> {
task::block_on(try_main("127.0.0.1:8080"))
}

async fn try_main(addr: impl ToSocketAddrs) -> Result<()> {
let stream = TcpStream::connect(addr).await?;
let (reader, mut writer) = (&stream, &stream);
let reader = BufReader::new(reader);
let mut lines_from_server = futures::StreamExt::fuse(reader.lines());

let stdin = BufReader::new(stdin());
let mut lines_from_stdin = futures::StreamExt::fuse(stdin.lines());
loop {
select! {
line = lines_from_server.next().fuse() => match line {
Some(line) => {
let line = line?;
println!("{}", line);
},
None => break,
},
line = lines_from_stdin.next().fuse() => match line {
Some(line) => {
let line = line?;
writer.write_all(line.as_bytes()).await?;
writer.write_all(b"\n").await?;
}
None => break,
}
}
}
Ok(())
let stream = Arc::new(TcpStream::connect(addr).await?);
let (reader, writer) = (stream.clone(), stream.clone());

let incoming = task::spawn(async move {
let mut messages = BufReader::new(&*reader).lines();
while let Some(message) = messages.next().await {
let message = message?;
println!("{}", message);
}
Ok(())
});

let outgoing = task::spawn(async move {
let mut stdin = BufReader::new(stdin()).lines();

while let Some(line) = stdin.next().await {
let line = line?;
yoshuawuyts marked this conversation as resolved.
Show resolved Hide resolved
let message = format!("{}\n", line);
(&*writer).write_all(message.as_bytes()).await?;
}
Ok(())
});

select!(incoming, outgoing).await
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ideally we should either join both tasks, or cancel the one that is still running. And look like hard-cancelling would a wrong thing to do, as it could cancel a task mid-line.

The solution with one select loop has an interesting propery that it guarantees "atomicity" of send/receive operations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, given that this is a quick&dirty CLI client, I don't think it's super important to really care here.

}
100 changes: 54 additions & 46 deletions examples/a-chat/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,16 @@ use std::{
sync::Arc,
};

use futures::{channel::mpsc, select, FutureExt, SinkExt};

use async_std::{
io::BufReader,
net::{TcpListener, TcpStream, ToSocketAddrs},
prelude::*,
task,
sync::{channel, Sender, Receiver},
stream,
};

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
type Sender<T> = mpsc::UnboundedSender<T>;
type Receiver<T> = mpsc::UnboundedReceiver<T>;

#[derive(Debug)]
enum Void {}
Expand All @@ -26,7 +24,7 @@ pub(crate) fn main() -> Result<()> {
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
let listener = TcpListener::bind(addr).await?;

let (broker_sender, broker_receiver) = mpsc::unbounded();
let (broker_sender, broker_receiver) = channel(10);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have never understood how one is supposed to pick buffer size properly. Maybe the correct choice here is actually 0 capacity? Queuing clients seems bad for latency, and we can't deadlock here, so not using a buffer should be ok.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe @stjepang can answer that?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should not think about buffer capacities too much. Capacities of 1, 10, and 100 are all valid here, it doesn't really matter.

Ideally, we'd use capacity of 0 as a safe default choice (like Go does, i.e. make(chan int) constructs a 0-capacity channel). However, the problem is async channels based on futures can't really have capacity 0.

A channel can truly have capacity of 0 only in the context of preemptible threads. With Go channels and crossbeam-channel, send and receive operation need to pair up, at which point we flip an atomic value and both sending and receiving side agree that a message has been sent.

With futures-based channel, we can't do that. Imagine a receive operation is pending and registered in the channel. Then comes a send operation and sends a message. What happens if the receiving side then wakes up and cancels its receive operation? Did the message get through or not? Doesn't matter if your answer is "yes" or "no", we'll run into some inconsistencies either way. I guess the point is that we can only create an illusion of 0-capacity channels, but the channel will in some ways behave as if the capacity was 1.

let broker = task::spawn(broker_loop(broker_receiver));
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
Expand All @@ -39,7 +37,7 @@ async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
Ok(())
}

async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
async fn connection_loop(broker: Sender<Event>, stream: TcpStream) -> Result<()> {
let stream = Arc::new(stream);
let reader = BufReader::new(&*stream);
let mut lines = reader.lines();
Expand All @@ -48,15 +46,14 @@ async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result
None => return Err("peer disconnected immediately".into()),
Some(line) => line?,
};
let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::<Void>();
let (_shutdown_sender, shutdown_receiver) = channel::<Void>(0);
broker
.send(Event::NewPeer {
name: name.clone(),
stream: Arc::clone(&stream),
shutdown: shutdown_receiver,
})
.await
.unwrap();
.await;

while let Some(line) = lines.next().await {
let line = line?;
Expand All @@ -76,28 +73,36 @@ async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result
to: dest,
msg,
})
.await
.unwrap();
.await;
}

Ok(())
}

#[derive(Debug)]
enum ConnectionWriterEvent {
Message(String),
Shutdown
}

async fn connection_writer_loop(
messages: &mut Receiver<String>,
stream: Arc<TcpStream>,
mut shutdown: Receiver<Void>,
shutdown: Receiver<Void>,
) -> Result<()> {
let mut stream = &*stream;
loop {
select! {
msg = messages.next().fuse() => match msg {
Some(msg) => stream.write_all(msg.as_bytes()).await?,
None => break,
},
void = shutdown.next().fuse() => match void {
Some(void) => match void {},
None => break,
let messages = messages.map(ConnectionWriterEvent::Message);
let shutdown = shutdown.map(|_| ConnectionWriterEvent::Shutdown).chain(stream::once(ConnectionWriterEvent::Shutdown));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's smart! Although I wonder if there's some simpler cooperative shutdown idiom...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think I've conjured something up:

fn with_shutdown<S, T>(stream: S, shutdown: Receiver<Void>) -> impl Stream<Item=T>
where
    S: Stream<Item=T>,
    T: Unpin,
{
    let items = stream.map(Some);
    let shutdown = shutdown.map(|void| match void {}).chain(stream::once(None));
    items.merge(shutdown).scan((), |&mut (), item| item)
}

The loop can then be written as

let mut messages = with_shutdown(messages, shutdown);
while let Some(msg) = messages.next().await {

}

Note that this also allows pushing cancellation completely out of this function and onto the call site (at the cost of making this generic over messages stream)!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think with_shutdown should probably take shutdown: impl Future<Item = !> rather than a specific receiver

Copy link
Contributor

@yoshuawuyts yoshuawuyts Nov 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@matklad I feel like the with_shutdown method you've written here might be a bit daunting for people trying to understand everything that's going on. I don't quite understand what shutdown.map(|void| match void {}) does, though I do trust it's correct.

I feel like in the case of this example we should probably try to err on keeping code as simple as possible. Never types, and empty matches could be tricky for people to pick up on. I think keeping what is in the PR already would be easier to follow for most people.

Copy link

@ghost ghost Nov 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same! I see how the "void" pattern is useful but is definitely something that is pretty unusual and gives me pause :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've managed to make cancellation in this case rather easy, at the cost of building a 75-lines long stop_source/stop_token library: matklad@ab02901. I feel this is roughly the place where we want to end-up eventually, but I am not sure we want to do this for tutorial.

I am genuinely don't know what to do with tutorial, I have how there's no simple solution :(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Met up with @skade in person today, and we had some discussions about this. I think we ended up with two conclusions:

  • We probably need a counterpart to Stream::merge that interleaves 2 streams, but drops them once one of them is exhausted.
  • We should look into writing a cancellation library that works for both futures and streams. We bounced around some ideas that were based on the pattern used in this PR. There's probably some design space we could explore here.

Regarding this PR. I think focusing on making it as understandable as possible might be the right direction for now. And once we figure out a more formal strategy for cancellation we can apply that in a follow-up PR.

How does that sound?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM, I’ve sort-of reaches similar conclusions, specifically:

  • merge shortest might be an interesting primitive, thought, in this particular case, it might be enough if merge drops a substrram that is exhausted (see Chain iterator adaptor shold drop exhausted subiterator rust-lang/rust#66031 for minimizes example)

  • cancellation should be in a library, and, as design space is large, it should be outside of async std at least for start. I believe last 75 lines of matklad@ab02901 might be a good start actually, should I just publish that to crates.io?

  • for this PR, I wonder if we should rip out cancellation completely?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for this PR, I wonder if we should rip out cancellation completely?

That's definitely an option I think. It's a very cool system to show off though, so I feel like it's a bit of a loss. But given the circumstance maybe it's for the best.

Also yeah having it on crates.io would be neat; big fan of publishing things!

Copy link
Member

@matklad matklad Nov 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yoshuawuyts published https://github.com/async-rs/stop-token. You are also an owner on crates-io (I was surprised that there's no async-rs-publishes group to which I can grant permissions). There's a high change that I won't be able to properly maintain this crate, but docs are very upfront that it is an experiment, and it's also in a finished state already, so there's hopefully little to maintain (well, until folks ask for linked tokens and cancellation callbacks...)


let mut events = shutdown.merge(messages);

while let Some(event) = events.next().await {
match event {
ConnectionWriterEvent::Message(msg) => {
stream.write_all(msg.as_bytes()).await?;
}
ConnectionWriterEvent::Shutdown => {
break
}
}
}
Expand All @@ -118,58 +123,61 @@ enum Event {
},
}

async fn broker_loop(mut events: Receiver<Event>) {
let (disconnect_sender, mut disconnect_receiver) =
mpsc::unbounded::<(String, Receiver<String>)>();
#[derive(Debug)]
enum BrokerEvent {
ClientEvent(Event),
Disconnection((String, Receiver<String>)),
Shutdown,
}

async fn broker_loop(events: Receiver<Event>) {
let (disconnect_sender, disconnect_receiver) = channel(10);

let mut peers: HashMap<String, Sender<String>> = HashMap::new();
let disconnect_receiver = disconnect_receiver.map(BrokerEvent::Disconnection);
let events = events.map(BrokerEvent::ClientEvent).chain(stream::once(BrokerEvent::Shutdown));

loop {
let event = select! {
event = events.next().fuse() => match event {
None => break,
Some(event) => event,
},
disconnect = disconnect_receiver.next().fuse() => {
let (name, _pending_messages) = disconnect.unwrap();
assert!(peers.remove(&name).is_some());
continue;
},
};
let mut stream = disconnect_receiver.merge(events);

while let Some(event) = stream.next().await {
match event {
Event::Message { from, to, msg } => {
BrokerEvent::ClientEvent(Event::Message { from, to, msg }) => {
for addr in to {
if let Some(peer) = peers.get_mut(&addr) {
let msg = format!("from {}: {}\n", from, msg);
peer.send(msg).await.unwrap();
peer.send(msg).await;
}
}
}
Event::NewPeer {
BrokerEvent::ClientEvent(Event::NewPeer {
name,
stream,
shutdown,
} => match peers.entry(name.clone()) {
}) => match peers.entry(name.clone()) {
Entry::Occupied(..) => (),
Entry::Vacant(entry) => {
let (client_sender, mut client_receiver) = mpsc::unbounded();
let (client_sender, mut client_receiver) = channel(10);
entry.insert(client_sender);
let mut disconnect_sender = disconnect_sender.clone();
let disconnect_sender = disconnect_sender.clone();
spawn_and_log_error(async move {
let res =
connection_writer_loop(&mut client_receiver, stream, shutdown).await;
disconnect_sender
.send((name, client_receiver))
.await
.unwrap();
.await;
res
});
}
},
}
BrokerEvent::Disconnection((name, _pending_messages)) => {
assert!(peers.remove(&name).is_some());
}
BrokerEvent::Shutdown => break,
}
}
drop(peers);
drop(disconnect_sender);
while let Some((_name, _pending_messages)) = disconnect_receiver.next().await {}
while let Some(BrokerEvent::Disconnection((_name, _pending_messages))) = stream.next().await {}
}

fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
Expand Down