Skip to content

Commit

Permalink
Add body parser/serializer setting in websocket
Browse files Browse the repository at this point in the history
So that you can configure these when using the `websocket` function.
This means that you can now get and send messages over a websocket
with, for example, a JSON body and not have to deal with that in the
handler code.
  • Loading branch information
jnthn committed Jan 31, 2018
1 parent d53f423 commit 80de27b
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 35 deletions.
3 changes: 2 additions & 1 deletion META6.json
Expand Up @@ -6,7 +6,8 @@
"tags" : [ "WebSocket", "Client", "Server" ],
"authors" : [ "Jonathan Worthington <jnthn@jnthn.net>" ],
"license" : "Artistic-2.0",
"depends" : [ "Cro::HTTP", "Base64", "Digest::SHA1::Native", "Crypt::Random" ],
"depends" : [ "Cro::HTTP", "Base64", "Digest::SHA1::Native", "Crypt::Random",
"JSON::Fast" ],
"provides" : {
"Cro::HTTP::Router::WebSocket": "lib/Cro/HTTP/Router/WebSocket.pm6",
"Cro::WebSocket::BodyParsers": "lib/Cro/WebSocket/BodyParsers.pm6",
Expand Down
53 changes: 52 additions & 1 deletion lib/Cro/HTTP/Router/WebSocket.pm6
@@ -1,5 +1,7 @@
use Base64;
use Digest::SHA1::Native;
use Cro::BodyParserSelector;
use Cro::BodySerializerSelector;
use Cro::HTTP::Router;
use Cro::Transform;
use Cro::TCP;
Expand All @@ -9,7 +11,45 @@ use Cro::WebSocket::FrameSerializer;
use Cro::WebSocket::MessageParser;
use Cro::WebSocket::MessageSerializer;

sub web-socket(&handler) is export {
my class SetBodyParsers does Cro::Transform {
has $!selector;

method BUILD(:$body-parsers --> Nil) {
$!selector = Cro::BodyParserSelector::List.new:
parsers => $body-parsers.list;
}

method consumes() { Cro::WebSocket::Message }
method produces() { Cro::WebSocket::Message }

method transformer(Supply $in --> Supply) {
supply whenever $in {
.body-parser-selector = $!selector;
.emit;
}
}
}

my class SetBodySerializers does Cro::Transform {
has $!selector;

method BUILD(:$body-serializers --> Nil) {
$!selector = Cro::BodySerializerSelector::List.new:
serializers => $body-serializers.list;
}

method consumes() { Cro::WebSocket::Message }
method produces() { Cro::WebSocket::Message }

method transformer(Supply $in --> Supply) {
supply whenever $in {
.body-serializer-selector = $!selector;
.emit;
}
}
}

sub web-socket(&handler, :$body-parsers, :$body-serializers) is export {
my constant $magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";

my $request = request;
Expand All @@ -31,6 +71,15 @@ sub web-socket(&handler) is export {
return;
};

my @before;
unless $body-parsers === Any {
push @before, SetBodyParsers.new(:$body-parsers);
}
my @after;
unless $body-serializers === Any {
unshift @after, SetBodySerializers.new(:$body-serializers);
}

my $key = $request.header('sec-websocket-key');

$response.status = 101;
Expand All @@ -42,7 +91,9 @@ sub web-socket(&handler) is export {
label => "WebSocket Handler",
Cro::WebSocket::FrameParser.new(:mask-required),
Cro::WebSocket::MessageParser.new,
|@before,
Cro::WebSocket::Handler.new(&handler),
|@after,
Cro::WebSocket::MessageSerializer.new,
Cro::WebSocket::FrameSerializer.new(:!mask)
);
Expand Down
15 changes: 15 additions & 0 deletions lib/Cro/WebSocket/BodyParsers.pm6
@@ -1,4 +1,5 @@
use Cro::BodyParser;
use JSON::Fast;

class Cro::WebSocket::BodyParser::Text does Cro::BodyParser {
method is-applicable($message) {
Expand All @@ -19,3 +20,17 @@ class Cro::WebSocket::BodyParser::Binary does Cro::BodyParser {
$message.body-blob
}
}

class Cro::WebSocket::BodyParser::JSON does Cro::BodyParser {
method is-applicable($message) {
# We presume that if this body parser has been installed, then we will
# always be doing JSON
True
}

method parse($message) {
$message.body-blob.then: -> $blob-promise {
from-json $blob-promise.result.decode('utf-8')
}
}
}
16 changes: 15 additions & 1 deletion lib/Cro/WebSocket/BodySerializers.pm6
@@ -1,5 +1,6 @@
use Cro::BodySerializer;
use Cro::WebSocket::Message::Opcode;
use JSON::Fast;

class Cro::WebSocket::BodySerializer::Text does Cro::BodySerializer {
method is-applicable($message, $body) {
Expand All @@ -8,7 +9,7 @@ class Cro::WebSocket::BodySerializer::Text does Cro::BodySerializer {

method serialize($message, $body) {
$message.opcode = Text;
supply emit $body.encode('utf-8');
supply emit $body.encode('utf-8')
}
}

Expand All @@ -22,3 +23,16 @@ class Cro::WebSocket::BodySerializer::Binary does Cro::BodySerializer {
supply emit $blob
}
}

class Cro::WebSocket::BodySerializer::JSON does Cro::BodySerializer {
method is-applicable($message, $body) {
# We presume that if this body serializer has been installed, then we
# will always be doing JSON
True
}

method serialize($message, $body) {
$message.opcode = Text;
supply emit to-json($body).encode('utf-8')
}
}
6 changes: 4 additions & 2 deletions lib/Cro/WebSocket/Handler.pm6
Expand Up @@ -43,13 +43,15 @@ class Cro::WebSocket::Handler does Cro::Transform {

when Cro::WebSocket::Message {
emit $_;
if $_.opcode == Cro::WebSocket::Message::Close {
if .opcode == Cro::WebSocket::Message::Close {
keep-close-promise();
$end = True;
done;
}
}
when Blob|Str|Supply { emit Cro::WebSocket::Message.new($_) }
default {
emit Cro::WebSocket::Message.new($_)
}

LAST {
close($end, Blob.new([3, 232])); # bytes of 1000
Expand Down
4 changes: 2 additions & 2 deletions lib/Cro/WebSocket/Message.pm6
Expand Up @@ -8,13 +8,13 @@ use Cro::WebSocket::Message::Opcode;
class Cro::WebSocket::Message does Cro::MessageWithBody {
has Cro::WebSocket::Message::Opcode $.opcode is rw;
has Bool $.fragmented;
has Cro::BodyParserSelector $.body-parser-selector =
has Cro::BodyParserSelector $.body-parser-selector is rw =
Cro::BodyParserSelector::List.new:
:parsers[
Cro::WebSocket::BodyParser::Text,
Cro::WebSocket::BodyParser::Binary
];
has Cro::BodySerializerSelector $.body-serializer-selector =
has Cro::BodySerializerSelector $.body-serializer-selector is rw =
Cro::BodySerializerSelector::List.new:
:serializers[
Cro::WebSocket::BodySerializer::Text,
Expand Down
89 changes: 61 additions & 28 deletions t/http-router-websocket.t
Expand Up @@ -2,6 +2,8 @@ use Cro::HTTP::Client;
use Cro::HTTP::Router::WebSocket;
use Cro::HTTP::Router;
use Cro::HTTP::Server;
use Cro::WebSocket::BodyParsers;
use Cro::WebSocket::BodySerializers;
use Cro::WebSocket::Client;
use Test;

Expand All @@ -15,40 +17,71 @@ my $app = route {
}
}
}
}

my $http-server = Cro::HTTP::Server.new(port => 3006,
application => $app);
get -> 'parser-serializer' {
web-socket
:body-parsers(Cro::WebSocket::BodyParser::JSON),
:body-serializers(Cro::WebSocket::BodySerializer::JSON),
-> $incoming {
supply whenever $incoming -> $message {
my $body = await $message.body;
$body<added> = 42;
$body<updated>++;
emit $body;
}
}
}
}

my $http-server = Cro::HTTP::Server.new(port => 3006, application => $app);
$http-server.start();
END $http-server.stop();

throws-like { await Cro::HTTP::Client.get('http://localhost:3006/chat') },
X::Cro::HTTP::Error::Client, 'Connection is not upgraded, 400 Bad Request';

my $c = await Cro::WebSocket::Client.connect: 'http://localhost:3006/chat';

my $p = Promise.new;
my %seen;
$c.messages.tap:
-> $m {
%seen{await $m.body-text}++;
$p.keep if %seen == 3;
},
quit => {
.note;
exit(1);
};

$c.send('Hello');
$c.send('Good');
$c.send('Wow');

await Promise.anyof(Promise.in(5), $p);
ok $p.status == Kept, 'All expected responses were received';
ok %seen{'You said: Hello'}:exists, 'Got first message response';
ok %seen{'You said: Good'}:exists, 'Got second message response';
ok %seen{'You said: Wow'}:exists, 'Got third message response';

$http-server.stop();
{
my $c = await Cro::WebSocket::Client.connect: 'http://localhost:3006/chat';

my $p = Promise.new;
my %seen;
$c.messages.tap:
-> $m {
%seen{await $m.body-text}++;
$p.keep if %seen == 3;
},
quit => {
.note;
exit(1);
};

$c.send('Hello');
$c.send('Good');
$c.send('Wow');

await Promise.anyof(Promise.in(5), $p);
ok $p.status == Kept, 'All expected responses were received';
ok %seen{'You said: Hello'}:exists, 'Got first message response';
ok %seen{'You said: Good'}:exists, 'Got second message response';
ok %seen{'You said: Wow'}:exists, 'Got third message response';

$c.close;
}

{
use JSON::Fast;

my $c = await Cro::WebSocket::Client.connect: 'http://localhost:3006/parser-serializer';
my $reply-promise = $c.messages.head.Promise;
$c.send(to-json({ updated => 99, kept => 'xxx' }));
my $reply = await $reply-promise;
my $parsed;
lives-ok { $parsed = from-json await $reply.body-text },
'Get back valid JSON from websocket endpoint with JSON parser/serializer endpoint';
is $parsed<updated>, 100, 'Expected data returned (1)';
is $parsed<kept>, 'xxx', 'Expected data returned (2)';
is $parsed<added>, 42, 'Expected data returned (3)';
$c.close;
}

done-testing;

0 comments on commit 80de27b

Please sign in to comment.