Skip to content

Commit

Permalink
stream concatenation
Browse files Browse the repository at this point in the history
  • Loading branch information
nothingmuch committed Sep 1, 2008
1 parent 11d8cf4 commit 760fe64
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 0 deletions.
41 changes: 41 additions & 0 deletions lib/Data/Stream/Bulk.pm
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,34 @@ sub all {
return @ret;
}

sub cat {
my ( $self, @streams ) = @_;

return $self unless @streams;

my @cat = $self->list_cat(@streams);

unless ( @cat ) {
return Data::Stream::Bulk::Nil->new;
} elsif ( @cat == 1 ) {
return $cat[0];
} else {
return Data::Stream::Bulk::Cat->new(
streams => \@cat,
);
}
}

sub list_cat {
my ( $self, $head, @tail ) = @_;

return $self unless $head;
return ( $self, $head->list_cat(@tail) );
}

# load it *after* the entire role is defined
require Data::Stream::Bulk::Cat;

__PACKAGE__

__END__
Expand Down Expand Up @@ -115,6 +143,19 @@ Force evaluation of the entire resultset.
Note that for large data sets this might cause swap thrashing of various other
undesired effects. Use with caution.
=item cat @streams
Concatenates this stream with @streams, returning a single stream.
=item list_cat @tail
Returns a possibly cleaned up list of streams.
Used by C<cat>.
Overridden by L<Data::Stream::Bulk::Array>, L<Data::Stream::Bulk::Cat> and
L<Data::Stream::Bulk::Nil> to implement some simple short circuiting.
=back
=head1 CLASSES
Expand Down
27 changes: 27 additions & 0 deletions lib/Data/Stream/Bulk/Array.pm
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,29 @@ sub next {
}
}

# squish several arrays into one
sub list_cat {
my ( $self, @rest ) = @_;

return $self unless @rest;

my @arrays = ( $self );

# fetch all adjacent arrays
push @arrays, shift @rest while @rest and $rest[0]->isa(__PACKAGE__);

if ( @arrays > 1 ) {
my @cat;
push @cat, @$_ for map { $_->_array } @arrays;
return __PACKAGE__->new(
array => \@cat,
)->cat( @rest );
} else {
my $head = shift @rest;
return ( $self, $head->list_cat(@rest) );
}
}

__PACKAGE__->meta->make_immutable;

__PACKAGE__
Expand Down Expand Up @@ -79,6 +102,10 @@ Returns the array reference on the first invocation, and nothing thereafter.
Returns true if C<next> has been called.
=item list_cat
Squishes adjacent arrays into a new array.
=back
=cut
93 changes: 93 additions & 0 deletions lib/Data/Stream/Bulk/Cat.pm
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#!/usr/bin/perl

package Data::Stream::Bulk::Cat;
use Moose;

use namespace::clean -except => 'meta';

with qw(Data::Stream::Bulk);

has streams => (
isa => "ArrayRef[Data::Stream::Bulk]",
is => "ro",
required => 1,
);

sub is_done {
my $self = shift;
@{ $self->streams } == 0;
}

sub next {
my $self = shift;

my $s = $self->streams;

return unless @$s;

my $next;

until ( $next = @$s && $s->[0]->next ) {
shift @$s;
return unless @$s;
}

return $next;
}

sub list_cat {
my ( $self, @rest ) = @_;
my ( $head, @tail ) = ( @{ $self->streams }, @rest );
return () unless $head;
return $head->list_cat(@tail);
}

__PACKAGE__->meta->make_immutable;

__PACKAGE__

__END__
=pod
=head1 NAME
Data::Stream::Bulk::Cat - Concatenated streams
=head1 SYNOPSIS
use Data::Stream::Bulk::Cat;
Data::Stream::Bulk::Cat->new(
streams => [ $s1, $s2, $s3 ],
);
=head1 DESCRIPTION
This stream is a concatenation of several other streams.
=head1 METHODS
=over 4
=item is_done
Returns true if the list of streams is empty.
=item next
Returns the next block from the next ready stream.
=item list_cat
Breaks down the internal list of streams, and delegates C<list_cat> to the
first one.
Has the effect of inlining the nested streams into the total concatenation,
allowing L<Data::Stream::Bulk::Array/list_cat> to work better.
=back
=cut
11 changes: 11 additions & 0 deletions lib/Data/Stream/Bulk/Nil.pm
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ sub next { undef }

sub is_done { 1 }

sub list_cat {
my ( $self, $head, @rest ) = @_;

return () unless $head;
return $head->list_cat(@rest);
}

__PACKAGE__->meta->make_immutable;

__PACKAGE__
Expand Down Expand Up @@ -43,6 +50,10 @@ Always returns true.
Always returns undef.
=item list_cat
Skips $self
=back
=cut
Expand Down
77 changes: 77 additions & 0 deletions t/basic.t
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use ok 'Data::Stream::Bulk::Util' => qw(bulk nil);
isa_ok( nil, "Data::Stream::Bulk::Nil", "nil() helper" );

isa_ok( bulk(), "Data::Stream::Bulk::Nil", "bulk() helper with no items" );

isa_ok( nil->cat(nil), "Data::Stream::Bulk::Nil", "cating nil with nil results in nil" );
}

{
Expand All @@ -35,6 +37,14 @@ use ok 'Data::Stream::Bulk::Util' => qw(bulk nil);
ok( !$d->next, "no next block" );

is_deeply( bulk(@array)->next, \@array, "bulk() helper" );

isa_ok( nil->cat(bulk(@array)), "Data::Stream::Bulk::Array", "nil cat Array results in Array" );

my $cat = bulk(qw(foo bar))->cat(bulk(qw(gorch baz)));

isa_ok( $cat, "Data::Stream::Bulk::Array", "Array cat Array resuls in Array" );

is_deeply( $cat->next, \@array, "concatenated array into one block" );
}

{
Expand Down Expand Up @@ -81,3 +91,70 @@ use ok 'Data::Stream::Bulk::Util' => qw(bulk nil);

ok( $d->is_done, "done" );
}

{
my @array = qw(foo bar);

my $cb = sub { @array && [ shift @array ] };

my $d = Data::Stream::Bulk::Callback->new( callback => $cb )->cat(bulk(qw(gorch baz)));

isa_ok( $d, "Data::Stream::Bulk::Cat" );

ok( !$d->is_done, "not done" );
is_deeply( [ $d->items ], [ "foo" ], "items method" );
ok( !$d->is_done, "not done" );
is_deeply( [ $d->items ], [ "bar" ], "items method" );
ok( !$d->is_done, "not done" );
is_deeply( [ $d->items ], [ qw(gorch baz) ], "reached array" );
ok( !$d->is_done, "not done" );
is_deeply( [ $d->items ], [ ], "items method" );

ok( $d->is_done, "now it's done" );

ok( !$d->next, "no next" );
}

{
my @array = qw(foo bar);
my $cb = sub { @array && [ shift @array ] };

my $d = nil->cat(bulk(qw(gorch baz))->cat(Data::Stream::Bulk::Callback->new( callback => $cb )->cat(bulk(qw(oi))->cat(nil->cat(bulk("vey"))))->cat(nil))->cat(nil))->cat(nil)->cat(Data::Stream::Bulk::Callback->new( callback => $cb )->cat(bulk(qw(last))));

isa_ok( $d, "Data::Stream::Bulk::Cat" );

is_deeply(
[ map { ref } @{ $d->streams } ],
[
"Data::Stream::Bulk::Array", # qw(gorch baz)
"Data::Stream::Bulk::Callback", # first cb
"Data::Stream::Bulk::Array", # "oi" cat "vey"
"Data::Stream::Bulk::Callback", # second CB
"Data::Stream::Bulk::Array", # "last"
],
"list_cat simplified concatenation",
);

ok( !$d->is_done, "not done" );
is_deeply( [ $d->items ], [ qw(gorch baz) ], "array block" );
ok( !$d->is_done, "not done" );
is_deeply( [ $d->items ], [ "foo" ], "items method" );
ok( !$d->is_done, "not done" );
is_deeply( [ $d->items ], [ "bar" ], "items method" );
ok( !$d->is_done, "not done" );
is_deeply( [ $d->items ], [ qw(oi vey) ], "items method" );
ok( !$d->is_done, "not done" );
is_deeply( [ $d->items ], [ "last" ], "items method" );
ok( !$d->is_done, "not done" );
is_deeply( [ $d->items ], [ ], "items method" );

ok( $d->is_done, "now it's done" );

ok( !$d->next, "no next" );

is_deeply(
[ map { ref } @{ $d->streams } ],
[ ],
"no streams in concatenated",
);
}

0 comments on commit 760fe64

Please sign in to comment.