Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Spec 'on' more elaborately
Add "index" feature, let implementation of "zip" and "merge" follow from the
examples.
  • Loading branch information
lizmat committed Apr 20, 2014
1 parent 05ea68b commit 0d7e427
Showing 1 changed file with 72 additions and 92 deletions.
164 changes: 72 additions & 92 deletions S17-concurrency.pod
Expand Up @@ -650,101 +650,81 @@ implementation, since values may arrive at any point on each, and possibly at
the same time. To help write such combinators, the C<on> meta-combinator is
useful. C<on> taps many supplies, and ensures that only B<one> callback will be
running at a time, freeing the combinator writer of worrying about
synchronization issues. Here is how C<zip> is implemented:

method zip(Supply $a, Supply $b, &with = &infix:<,>) {
my @as;
my @bs;
on -> $res {
$a => sub ($val) {
@as.push($val);
if @as && @bs {
$res.more(with(@as.shift, @bs.shift));
}
},
$b => sub ($val) {
@bs.push($val);
if @as && @bs {
$res.more(with(@as.shift, @bs.shift));
}
}
}
}

Conjecture: this is going to end up looking more like:

method zip(Supply $a, Supply $b, &with = &infix:<,>) {
my @as;
my @bs;
combine $a, $b -> $out {
more * {
($:k ?? @bs !! @as).push($_);
if @as && @bs {
$out.more(with(@as.shift, @bs.shift));
}
}
}
}

or even:

method zip(*@in, :&with = &infix:<,>) {
my @streams = [] xx @in;
combine @in -> $out {
more * {
@streams[$:k].push($_);
if all @streams».elems > 0 {
$out.more(with(|@streams));
}
}
}
}

(As with the reaction routines for C<winner>, in C<combine> the
reaction routines C<$:k> is passed as the position of the matching
entry from the list supplied to C<combine>, and C<$:v> is the actual
supply object that was passed in that position and that is being
tapped for its value.)
synchronization issues.

The C<on> combinator takes a block that receives the C<Supply> it will generate
(and return) as the parameter. That block is supposed to return list of
C<Pairs>, in which the keys are one or more C<Supply>s. And the values are
either a C<Block> (to be called for each value for that C<Supply>), or a
hash with Pairs for C<more>, C<done> and C<quit>.

A simple combinator for C<Pair>ing values from two C<Supply>s ($a and $b),
would look like this:

my $result = on -> $res {
my @as;
my @bs;
on -> $res {
$a => sub ($val) {
@as.push($val);
if @as && @bs {
$res.more( @as.shift => @bs.shift );
}
},
$b => sub ($val) {
@bs.push($val);
if @as && @bs {
$res.more( @as.shift => @bs.shift );
}
}
}
}

Thus there is never any race or other thread-safely problems with
mutating the C<@as> and C<@bs>. The default behaviour, if a callable is
mutating the C<@as> and C<@bs>. The default behaviour, if a C<Callable> is
specified along with the supply, is to use it for C<more> and provide
a default C<done> and C<quit>. The default C<done> triggers C<done>
on the result supply, which is the correct semantics for C<zip>. On
the other hand, C<merge> wants different semantics, and so must
provide a C<done>. This can be implemented as follows:

method merge(Supply $a, Supply $b) {
my $done = 0;
on -> $res {
$a => {
more => sub ($val) { $res.more($val) },
done => {
$res.done() if ++$done == 2;
}
},
$b => {
more => sub ($val) { $res.more($val) },
done => {
$res.done() if ++$done == 2;
}
}
}
}

or conjecturally:

method merge(*@ins) {
my $done = 0;
combine @ins -> $out {
more * { $out.more($_) }
done * { $out.done() if ++$done == +@ins; }
}
}

A C<quit> handler can be provided in a similar way, although the default - convey the
failure to the result supply - is normally what is wanted. The exception
is writing combinators related to error handling.
a default C<done> and C<quit>. The default C<done> triggers C<done> on the
result C<Supply>.

Note that the code blocks for both C<Supply>s are identical. There must be
a better way of doing this. And indeed, there is: you can also specify more
than one C<Supply> per block. The same as above implemented using that:

my $result = on -> $res {
my @values = ([],[]);
($a,$b) => sub ($val,$index) {
@values[$index].push($val);
if all(@values) {
$res.more( (@values>>.shift) );
}
}
}

Note that the block that is being called for each value from any of the
C<Supply>s also receives an index value to be able to group the values
received. By default, any C<done> or C<quit> will be immediately propagated.
This is basically how C<zip> is implemented.

Sometimes, we want the resulting C<Supply> to be C<done> only when all
specified C<Supply>s are done. This is possible by specifying a hash with
keys for C<more>, C<done> and/or C<quit>, instead of just a C<Callable>.
Given an array @s with C<Supply>s:

my $done = 0;
my $result = on -> $res {
@s => {
more => -> \val { $res.more(val) },
done => { $res.done if ++$done == +@s }
}
}

This is essentially how C<merge> is implemented. Note that if we don't need
the index (as indicated by its absence in the signature of the C<Callable>s),
it will not be passed.

A C<quit> handler can be provided in a similar way, although the default -
convey the failure to the result supply - is normally what is wanted. The
exception is writing combinators related to error handling.

=head1 The Event Loop

Expand Down

0 comments on commit 0d7e427

Please sign in to comment.