Skip to content

Commit

Permalink
More correct web socket close Promise handling
Browse files Browse the repository at this point in the history
A recent commit forced processing of messages sent to the web socket
handler to happen asynchronously, to avoid deadlocks when an `await`
of the message body would prevent incoming frames to be processed in
multi-frame messages. Unfortunately, that could cause the close
Promise to be kept before all message processing was done. This
change makes sure that won't happen.
  • Loading branch information
jnthn committed Feb 1, 2018
1 parent bac66d8 commit 622fcd3
Showing 1 changed file with 28 additions and 14 deletions.
42 changes: 28 additions & 14 deletions lib/Cro/WebSocket/Handler.pm6
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,40 @@ class Cro::WebSocket::Handler does Cro::Transform {
}
}

my class CloseMessage {
has $.message;
}
my $block-feed = $supplier.Supply.Channel.Supply.grep: -> $msg {
if $msg ~~ CloseMessage {
$msg.defined
?? keep-close-promise($msg.message)
!! keep-close-promise();
False
}
else {
True
}
}
my $block = &!block.count == 1
?? &!block($supplier.Supply.Channel.Supply)
!! &!block($supplier.Supply.Channel.Supply, $on-close);
?? &!block($block-feed)
!! &!block($block-feed, $on-close);

whenever $block {
sub close(Bool $end, Blob $code) {
unless $end {
emit Cro::WebSocket::Message.new(
opcode => Cro::WebSocket::Message::Close,
fragmented => False,
body-byte-stream => supply { emit $code });
keep-close-promise();
done;
}
sub close(Bool $end, Blob $code) {
unless $end {
emit Cro::WebSocket::Message.new(
opcode => Cro::WebSocket::Message::Close,
fragmented => False,
body-byte-stream => supply { emit $code });
$supplier.emit(CloseMessage);
done;
}
}

whenever $block {
when Cro::WebSocket::Message {
emit $_;
if .opcode == Cro::WebSocket::Message::Close {
keep-close-promise();
$supplier.emit(CloseMessage);
$end = True;
done;
}
Expand Down Expand Up @@ -83,7 +97,7 @@ class Cro::WebSocket::Handler does Cro::Transform {
emit (await $m.body-blob);
done;
});
keep-close-promise($m);
$supplier.emit(CloseMessage.new(message => $m));
$supplier.done;
}
default {}
Expand Down

0 comments on commit 622fcd3

Please sign in to comment.