Skip to content

Commit 51a26e6

Browse files
authored
Merge pull request #161 from patrickbkr/remote-window-handling
Implement remote window handling
2 parents 3cb30b3 + 70beb8e commit 51a26e6

File tree

3 files changed

+80
-12
lines changed

3 files changed

+80
-12
lines changed

lib/Cro/HTTP2/ConnectionState.pm6

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,62 @@
11
class Cro::HTTP2::ConnectionState {
2+
class WindowAdd {
3+
has $.stream-identifier;
4+
has $.increment;
5+
}
6+
class WindowConsume {
7+
has $.stream-identifier;
8+
has $.bytes;
9+
has $.promise;
10+
}
11+
class WindowInitial {
12+
has $.initial;
13+
}
214
has Supplier $.settings = Supplier.new;
315
has Supplier $.ping = Supplier.new;
416
has Supplier $.window-size = Supplier.new;
17+
has $!initial-window-size = 65535;
18+
has Supplier $.remote-window-change = Supplier.new;
19+
# Connection represented as stream 0.
20+
has @!remote-window-sizes;
21+
has @!remote-window-consume-queue;
522
has Supplier $.push-promise = Supplier.new;
623
has Supplier $.stream-reset = Supplier.new;
24+
25+
submethod TWEAK() {
26+
sub check-window-size($wc) {
27+
if $wc.bytes <= @!remote-window-sizes[0] &&
28+
$wc.bytes <= @!remote-window-sizes[$wc.stream-identifier] {
29+
@!remote-window-sizes[0] -= $wc.bytes;
30+
@!remote-window-sizes[$wc.stream-identifier] -= $wc.bytes;
31+
$wc.promise.keep;
32+
return True;
33+
}
34+
return False;
35+
}
36+
@!remote-window-sizes[0] = $!initial-window-size;
37+
38+
$!remote-window-change.Supply.tap: {
39+
when WindowAdd {
40+
@!remote-window-sizes[.stream-identifier] = $!initial-window-size without @!remote-window-sizes[.stream-identifier];
41+
@!remote-window-sizes[.stream-identifier] += .increment;
42+
while @!remote-window-consume-queue && check-window-size(@!remote-window-consume-queue[*-1]) {
43+
@!remote-window-consume-queue.pop;
44+
}
45+
}
46+
when WindowConsume {
47+
# For some reason I do not understand Rakudo throws the following error when I inline the
48+
# `.stream-identifier`, even more obscure as it works flawlessly above:
49+
# "No such method 'stream-identifier' for invocant of type 'Any'"
50+
my $stream = .stream-identifier;
51+
@!remote-window-sizes[$stream] = $!initial-window-size without @!remote-window-sizes[$stream];
52+
unless check-window-size($_) {
53+
@!remote-window-consume-queue.push($_)
54+
}
55+
}
56+
when WindowInitial {
57+
@!remote-window-sizes »+=» .initial - $!initial-window-size;
58+
$!initial-window-size = .initial;
59+
}
60+
};
61+
}
762
}

lib/Cro/HTTP2/FrameSerializer.pm6

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,22 @@ class Cro::HTTP2::FrameSerializer does Cro::Transform does Cro::ConnectionState[
2828
}
2929

3030
my $MAX-FRAME-SIZE = 2 ** 14;
31-
sub send-message($frame) {
31+
sub send-message($frame, $consumes-window = False) {
32+
if $consumes-window {
33+
my $promise = Promise.new;
34+
$connection-state.remote-window-change.emit: Cro::HTTP2::ConnectionState::WindowConsume.new(
35+
stream-identifier => $frame.stream-identifier,
36+
bytes => $frame.data.bytes + ($frame.padded ?? 8 + $frame.padding-length !! 0),
37+
:$promise,
38+
);
39+
await $promise;
40+
}
3241
my $result = self!form-header($frame);
3342
self!serializer($result, $frame);
3443
emit Cro::TCP::Message.new(data => $result);
3544
}
3645

37-
sub send-splitted($frame) {
46+
sub send-splitted($frame, $consumes-window = False) {
3847
my $is-header = $frame ~~ Cro::HTTP2::Frame::Headers|Cro::HTTP2::Frame::PushPromise;
3948
my $payload = $is-header ?? $frame.headers !! $frame.data;
4049
my $flag = $frame.flags == 4 ?? 0 !! 1 if $is-header;
@@ -44,7 +53,7 @@ class Cro::HTTP2::FrameSerializer does Cro::Transform does Cro::ConnectionState[
4453
my %arg = $is-header ?? headers => $first-part !! data => $first-part;
4554
send-message($frame.clone(
4655
flags => $is-header ?? $flag !! 0,
47-
|%arg));
56+
|%arg), $consumes-window);
4857

4958
while $payload.elems > 0 {
5059
my $sent = $payload.elems < $MAX-FRAME-SIZE-9
@@ -59,7 +68,7 @@ class Cro::HTTP2::FrameSerializer does Cro::Transform does Cro::ConnectionState[
5968
} else {
6069
$message = Cro::HTTP2::Frame::Data.new(|%arg)
6170
}
62-
send-message($message);
71+
send-message($message, $consumes-window);
6372
$payload .= subbuf($sent);
6473
}
6574
}
@@ -118,9 +127,9 @@ class Cro::HTTP2::FrameSerializer does Cro::Transform does Cro::ConnectionState[
118127
}
119128
} elsif $frame ~~ Cro::HTTP2::Frame::Data {
120129
if $frame.data.elems + 9 > $MAX-FRAME-SIZE {
121-
send-splitted($frame);
130+
send-splitted($frame, True);
122131
} else {
123-
send-message($frame);
132+
send-message($frame, True);
124133
}
125134
} else {
126135
send-message($frame);

lib/Cro/HTTP2/GeneralParser.pm6

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,14 @@ role Cro::HTTP2::GeneralParser does Cro::ConnectionState[Cro::HTTP2::ConnectionS
4444
whenever $connection-state.push-promise.Supply { emit $_ }
4545
whenever $connection-state.settings.Supply {
4646
when Cro::HTTP2::Frame::Settings {
47-
with .settings.grep(*.key == 1) {
48-
my $pair = $_.first;
49-
$decoder.set-dynamic-table-limit($pair.value) if $pair;
47+
with .settings.first(*.key == 1) {
48+
$decoder.set-dynamic-table-limit(.value);
5049
}
51-
with .settings.grep(*.key == 2) {
52-
my $pair = $_.first;
53-
$!enable-push = $pair.value != 0 if $pair;
50+
with .settings.first(*.key == 2) {
51+
$!enable-push = .value != 0;
52+
}
53+
with .settings.first(*.key == 4) {
54+
$connection-state.remote-window-change.emit: Cro::HTTP2::ConnectionState::WindowInitial.new(initial => .value);
5455
}
5556
}
5657
}
@@ -168,6 +169,9 @@ role Cro::HTTP2::GeneralParser does Cro::ConnectionState[Cro::HTTP2::ConnectionS
168169
when Cro::HTTP2::Frame::GoAway {
169170
}
170171
when Cro::HTTP2::Frame::WindowUpdate {
172+
$connection-state.remote-window-change.emit: Cro::HTTP2::ConnectionState::WindowAdd.new:
173+
stream-identifier => .stream-identifier,
174+
increment => .increment;
171175
}
172176
when Cro::HTTP2::Frame::Continuation {
173177
if .stream-identifier > $curr-sid

0 commit comments

Comments
 (0)