A lazy enumerable implementation for Perl5
Switch branches/tags
Nothing to show
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
lib/Data/Enumerable
t/Data/Enumerable/Lazy.pm
.gitignore
LICENSE.txt
README

README

NAME
    Data::Enumerable::Lazy

SYNOPSIS
    A basic lazy range implementation picking even numbers only:

      my ($from, $to) = (0, 10);
      my $current = $from;
      my $tream = Data::Enumerable::Lazy->new({
        on_has_next => sub { $current <= $to          },
        on_next     => sub { shift->yield($current++) },
      })->grep(sub{ shift % 2 == 0 });
      $tream->to_list(); # generates: [0, 2, 4, 6, 8, 10]

  DESCRIPTION
    This library is another one implementation of a lazy generator +
    enumerable for Perl5. It might be handy if the elements of the
    collection are resolved on the flight and the iteration itself should be
    hidden from the end users.

    The enumerables are single-pass composable calculation units. What it
    means: An enumerable is stateful, once it reached the end of the
    sequence, it will not rewind to the beginning unless explicitly forced
    to. Enumerables are composable: one enumerable might be an extension of
    another by applying some additional logic. Enumerables resolve steps on
    demand, one by one. A single step might return another enumerable (micro
    batches). The library flattens these enumerables, so for the end user
    this looks like a single continuous sequence of elements.

      [enumerable.has_next] -> [_buffer.has_next] -> yes -> return true
                                                  -> no -> result = [enumerable.on_has_next] -> return result

      [enumerable.next] -> [_buffer.has_next] -> yes -> return [_buffer.next]
                                              -> no -> result = [enumerable.next] -> [enumerable.set_buffer(result)] -> return result

EXAMPLES
  A basic range
    This example implements a range generator from $from until $to. In order
    to generate this range we define 2 callbacks: `on_has_next()' and
    `on_next()'. The first one is used as point of truth whether the
    sequence has any more non-iterated elements, and the 2nd one is here to
    return the next element in the sequence and the one that changes the
    state of the internal sequence iterator.

      sub basic_range {
        my ($from, $to) = @_;
        $from <= $to or die '$from should be less or equal $to';
        my $current = $from;
        Data::Enumerable::Lazy->new({
          on_has_next => sub {
            return $current <= $to;
          },
          on_next => sub {
            my ($self) = @_;
            return $self->yield($current++);
          },
        });
      }

    on_has_next() makes sure the current value does not exceed $to value,
    and on_next() yields the next value of the sequence. Note the yield
    method. An enumerable developer is expected to use this method in order
    to return the next step value. This method does some internal
    bookkeeping and smart caching.

    Usage:

    # We initialize a new range generator from 0 to 10 including. my $range
    = basic_range(0, 10); # We check if the sequence has elements in it's
    tail. while ($range->has_next) { # In this very line the state of $range
    is being changed say $range->next; }

      is $range->has_next, 0, '$range has been iterated completely'
      is $range->next, undef, 'A fully iterated sequence returns undef on next()'

  Prime numbers
    Prime numbers is an infinite sequence of natural numbers. This example
    implements a very naive suboptimal prime number generator.

      my $prime_num_stream = Data::Enumerable::Lazy->new({
        # This is an infinite sequence
        on_has_next => sub { 1 },
        on_next => sub {
          my $self = shift;
          # We save the result of the previous step
          my $next = $self->{_prev_} // 1;
          LOOKUP: while (1) {
            $next++;
            # Check all numbers from 2 to sqrt(N)
            foreach (2..floor(sqrt($next))) {
              ($next % $_ == 0) and next LOOKUP;
            }
            last LOOKUP;
          }
          # Save the result in order to use it in the next step
          $self->{_prev_} = $next;
          # Return the result
          $self->yield($next);
        },
      });

    What's remarkable regarding this specific example is that one can not
    simply call `to_list()' in order to get all elements of the sequence.
    The enumerable will throw an exception claiming it's an infinitive
    sequence. Therefore, we should use `next()' in order to get elements one
    by one or use another handy method `take()' which returns first N
    results.

  Flat enumeration (Nested generators)
    In this example we will output a numbers of a multiplication table
    10x10. What's interesting in this example is that there are 2 levels of
    sequences: primary and secondary. Primary `on_next()' returns secondary
    sequence, which multiplicates 2 numbers.

      # A new stream based on a range from 1 to 10
      my $mult_table = Data::Enumerable::Lazy->from_list(1..10)->continue({
        on_has_next => sub {
          my ($self, $i) = @_;
          # The primary stream returns another sequence, based on range
          $self->yield(Data::Enumerable::Lazy->from_list(1..10)->continue({
            on_next => sub {
              # $_[0] is a substream self
              # $_[1] is a next substream sequence element
              $_[0]->yield( $_[1] * $i )
            },
          }));
        },
      });

    Another feature which is represented here is the nested result
    generation. Let's walk trough the sequence generation step by step and
    see what happens.

      $mult_table->has_next; # returns true based on the primary range, _buffer is
                             # empty
      $mult_table->next;     # returns 1, the secondary sequence is now stored as
                             # the primary enumerable buffer and 1 is being served
                             # from this buffer
      $mult_table->has_next; # returns true, resolved by the state of the buffer
      $mult_table->next;     # returns 2, moves buffer iterator forward, the
                             # primary sequence on_next() is _not_ being called
                             # this time
      $mult_table->next for (3..10); # The last iteration completes the buffer
                             # iteration cycle
      $mult_table->has_next; # returns true, but now it calls the primary
                             # on_has_next()
      $mult_table->next;     # returns 2 as the first element in the next
                             # secondary sequence (which is 1 again) multiplied by
                             # the 2nd element of the primary sequence (which is 2)
      $mult_table->to_list;  # Generates the tail of the sesquence:
                             # [4, 6, ..., 80, 90, 100]
      $mult_table->has_next; # returns false as the buffer is empty now and the
                             # primary sequence on_has_next() says there is nothing
                             # more to iterate over.

  DBI paginator example
    As mentioned earlier, lazy enumerables are useful when the number of the
    elements in the sequence is not known in advance. So far, we were
    looking at some synthetic examples, but the majority of us are not being
    paid for prime number generators. Hands on some real life example. Say,
    we have a table and we want to iterate over all entries in the table,
    and we want the data to be retrieved in batches by 10 elements in order
    to reduce the number of queries. We don't want to compute the number of
    steps in advance, as the number might be inaccurate: let's assume we're
    paginating over some new tweets and the new entries might be created on
    the flight.

      use DBI;
      my $dbh = setup_dbh(); # Some config

      my $last_id = -1;
      my $limit = 10;
      my $offset = 0;
      my $tweet_enum = Data::Enumerable::Lazy->new({
        on_has_next => sub {
          my $sth = $dbh->prepare('SELECT count(1) from Tweets where id > ?');
          $sth->execute($last_id);
          my ($cnt) = $sth->fetchrow_array;
          return int($cnt) > 0;
        },
        on_next => sub {
          my ($self) = @_;
          my $sth = $dbh->prepare('SELECT * from Tweets ORDER BY id LIMIT ? OFFSET ?');
          $sth->execute($lmit, $offset);
          $offset += $limit;
          my @tweets = $sth->fetchrow_array;
          $last_id = $tweets[-1]->{id};
          $self->yield(Data::Enumerable::Lazy->from_list(@tweets));
        },
        is_finite => 1,
      });

      while ($tweet_enum->has_next) {
        my $tweet = $tweet_enum->next;
        # do something with this tweet
      }

    In this example a tweet consumer is abstracted from any DBI bookkeeping
    and consumes tweet entries one by one without any prior knowledge about
    the table size and might work on a rapidly growing dataset.

    In order to reduce the number of queries, we query the data in batches
    by 10 elements max.

  Redis queue consumer
      use Redis;

      my $redis = Redis->new;
      my $queue_enum = Data::Enumerable::Lazy->new({
        on_has_next => sub { 1 },
        on_next => sub {
          # Blocking right POP
          $redis->brpop();
        },
      });

      while (my $queue_item = $queue_enum->next) {
        # do something with the queue item
      }

    In this example the client is blocked until there is an element
    available in the queue, but it's hidden away from the clients who
    consume the data item by item.

  Kafka example
    Kafka consumer wrapper is another example of a lazy calculation
    application. Lazy enumerables are very naturally co-operated with
    streaming data, like Kafka. In this example we're fetching batches of
    messages from Kafka topic, grep out corrupted ones and proceed with the
    mesages.

      use Kafka qw($DEFAULT_MAX_BYTES);
      use Kafka::Connection;
      use Kafka::Consumer;

      my $kafka_consumer = Kafka::Consumer->new(
        Connection => Kafka::Connection->new( host => $my_host, ... ),
      );

      my $partition = 0;
      my $offset = 0;
      my $kafka_enum = Data::Enumerable::Lazy->new({
        on_has_next => sub { 1 },
        on_next => sub {
          my ($self) = @_;
          # Fetch messages in batch
          my $messages = $kafka_consumer->fetch({
            'topic',
            $partition,
            $offset,
            $DEFAULT_MAX_BYTES
          });
          if ($messages) {
            # Note the grep function applied: we're filtering away corrupted messages
            $self->yield(Data::Enumerable::Lazy->from_list(@$messages))->grep(sub { $_[0]->valid });
          } else {
            # If there are no more messages, we return an empty enum, this is
            # another handy use-case for nested enums.
            $self->yield(Data::Enumerable::Lazy->empty);
          }
        },
      });

      while (my $message = $kafka_enum->next) {
        # handle the message
      }

OPTIONS
  on_next($self, $element) :: CodeRef -> Data::Enumerable::Lazy | Any
    `on_next' is a code ref, a callback which is being called every time the
    generator is in demand for a new bit of data. Enumerable buffers up the
    result of the previous calculation and if there are no more elements
    left in the buffer, `on_next()' would be called.

    `$element' is defined when the current collection is a contuniation of
    another enumerable. I.e.: my $enum =
    Data::Enumerable::Lazy->from_list(1, 2, 3); my $enum2 =
    $enum->continue({ on_next => sub { my ($self, $i) = @_; $self->yield($i
    * $i) } }); $enum2->to_list; # generates 1, 4, 9 In this case $i would
    be defined and it comes from the original enumerable.

    The function is supposed to return an enumerable, in this case it would
    be kept as the buffer object. If this function method returns any other
    value, it would be wrapped in a `Data::Enumerable::Lazy-'singular()>.
    There is a way to prevent an enumerable from wrapping your return value
    in an enum and keeping it in a raw state by providing `no_wrap=1'.

  on_has_next($self) :: CodeRef -> Bool
    `on_has_next' is a code ref, a callback to be called whenever the
    enumerable is about to resolve `has_next()' method call. Similar to
    `on_next()' call, this one is also triggered whenever an enumerable runs
    out of buffered elements. The function shoiuld return boolean.

    A method that returns 1 all the time is the way to initialize an
    infinite enumerable (see `infinity()'). If it returns 0 no matter what,
    it would be an empty enumerable (see `empty()'). Normally you want to
    stay somewhere in the middle and implement some state check login in
    there.

  on_reset($self) :: CodeRef -> void
    This is a callback to be called in order to reset the state of the
    enumerable. This callback should be defined in the same scope as the
    enumerable itself. The library provides nothing magical but a callback
    and a handle to call it, so the state cleanup is completely on the
    developer's side.

  is_finite :: Bool
    A boolean flag indicating whether an enumerable is finite or not. By
    default enumerables are treated as infinite, which means some functions
    will throw an exception, like: `to_list()' or `resolve()'.

    Make sure to not mark an enumerable as finite and to call finite-size
    defined methods, in this case it will create an infinite loop on the
    resolution.

INSTANCE METHODS
  next()
    Function `next()' is the primary interface for accessing elements of an
    enumerable. It will do some internal checks and if there is no elements
    to be served from an intermediate buffer, it will resolve the next step
    by calling `on_next()' callback. Enumerables are composable: one
    enumerable might be based on another enumeration. E.g.: a sequence of
    natural number squares is based on the sequence of natural numbers
    themselves. In other words, a sequence is defined as a tuple of another
    sequence and a function which would be lazily applied to every element
    of this sequence.

    `next()' accepts 0 or more arguments, which would be passed to
    `on_next()' callback.

    `next()' is expected to do the heavy-lifting job in opposite to
    `has_next()', which is supposed to be cheap and fast. This statement
    flips upside down whenever `grep()' is applied to a stream. See `grep()'
    for more details.

  has_next()
    `has_next()' is the primary entry point to get an information about the
    state of an enumerable. If the method returned false, there are no more
    elements to be consumed. I.e. the sequence has been iterated completely.
    Normally it means the end of an iteration cycle.

    Enumerables use internal buffers in order to support batched `on_next()'
    resolutions. If there are some elements left in the buffer, `on_next()'
    won't call `on_has_next()' callback immediately. If the buffer has been
    iterated completely, `on_has_next()' would be called.

    `on_next()' should be fast on resolving the state of an enumerable as
    it's going to be used for a condition state check.

  reset()
    This method is a generic entry point for a enum reset. In fact, it is
    basically a wrapper around user-defined `on_reset()'. Use with caution:
    if `on_reset()' was not defined, it will reset the buffer and might
    cause a partial calculation skip (reset implicitly clears the internal
    buffer) if the buffer was not fully iterated yet.

  to_list()
    This function transforms a lazy enumerable to a list. Only finite
    enumerables can be transformed to a list, so the method checks if an
    enumerable is created with `is_finite=1' flag. An exception would be
    thrown otherwise.

  map($callback)
    Creates a new enumerable by applying a user-defined function to the
    original enumerable. Works the same way as perl map {} function but it's
    lazy.

    Example Data::Enumerable::Lazy ->from_array(1, 2, 3) ->map(sub { my
    ($number) = @_; return $number * $number });

  reduce($acc, $callback)
    Resolves the enumerable and returns the resulting state of the
    accumulator $acc provided as the 1st argument. `$callback' should always
    return the new state of `$acc'.

    `reduce()' is defined for finite enumerables only.

    Example Data::Enumerable::Lazy ->from_array(1, 2, 3) ->reduce(1, sub {
    my ($acc, $number) = @_; return $acc *= $number });

  grep($callback, $max_lookahead)
    `grep()' is a function which returns a new enumerable by applying a
    user-defined filter function.

    `grep()' might be applied to both finite and infinite enumerables. In
    case of an infinitive enumerable there is an additional argument
    specifying max number of lookahead steps. If an element satisfying the
    condition could not be found in `max_lookahead' steps, an enumerable is
    considered to be completely iterated and `has_next()' will return false.

    `grep()' returns a new enumerable with quite special properties:
    `has_next()' will perform a look ahead and call the original enumerable
    `next()' method in order to find an element for which the user-defined
    function will return true. `next()', on the other side, returns the
    value that was pre-fetched by `has_next()'.

    Example Data::Enumerable::Lazy ->from_list(1, 2, 3) ->grep(sub { my
    ($number) = @_; return $number % 2 })

  resolve()
    Resolves an enumerable completely. Applicable for finite enumerables
    only. The method returns nothing.

  take($N_elements)
    Resolves first $N_elements and returns the resulting list. If there are
    fewer than N elements in the enumerable, the entire enumerable would be
    returned as a list.

  take_while($callback)
    This function takes elements until it meets the first one that does not
    satisfy the conditional callback. The callback takes only 1 argument: an
    element. It should return true if the element should be taken. Once it
    returned false, the stream is over.

  continue($ext = %{ on_next => sub {}, ... })
    Creates a new enumerable by extending the existing one. on_next is the
    only manfatory argument. on_has_next might be overriden if some custom
    logic comes into play.

    is_finite is inherited from the parent enumerable by default. All
    additional attributes would be transparently passed to the constuctor.

  count()
    Counts the number of the elements in the stream. This method iterates
    through the stream so it makes it exhausted by the end of the
    computatuion.

  yield($result)
    This method is supposed to be called from `on_next' callback only. This
    is the only valid result for an Enumerable to return the next step
    result. Effectively, it ensures the returned result conforms to the
    required interface and is wrapped in a lazy wrapper if needed.

CLASS METHODS
  empty()
    Returns an empty enumerable. Effectively it means an equivalent of an
    empty array. `has_next()' will return false and `next()' will return
    undef. Useful whenever a `on_next()' step wants to return an empty
    resultset.

  singular($val)
    Returns an enumerable with a single element $val. Actively used as an
    internal data container.

  from_list(@list)
    Returns a new enumerable instantiated from a list. The easiest way to
    initialize an enumerable. In fact, all elements are already resolved so
    this method sets `is_finite=1' by default.

  cycle()
    Creates an infinitive enumerable by cycling the original list. E.g. if
    the original list is [1, 2, 3], `cycle()' will generate an infinitive
    sequences like: 1, 2, 3, 1, 2, 3, 1, ...

  infinity()
    Returns a new infinite enumerable. `has_next()' always returns true
    whereas `next()' returns undef all the time. Useful as an extension
    basis for infinite sequences.

  merge($tream1 [, $tream2 [, $tream3 [, ...]]])
    This function merges one or more streams together by fan-outing `next()'
    method call among the non-empty streams. Returns a new enumerable
    instance, which: * Has next elements as far as at least one of the
    streams does. * Returns next element py picking it one-by-one from the
    streams. * Is finite if and only if all the streams are finite. If one
    of the streams is over, it would be taken into account and `next()' will
    continue choosing from non-empty ones.

  chain($tream1(, $tream2(, $tream3(, ...))))
    Executes streams sequentually, one after another: the next stream starts
    once the previous is over.

  from_text_file($file_handle(, $options))
    Method takes an open file handle and an optional hash of options and
    creates a stream of it. The file would be read as a text file, line by
    line. For additional options see `open()' perl core function reference.
    Options is a basic hash, supported attributes are: * chomp :: Bool |
    Whether the lines should be chomped, 0 by default. * is_finite :: Bool |
    Forces the stream to be processed as finite, 0 by default.

  from_bin_file($file_handle(, $options))
    Method similar to `from_text_file()' but forces binary reading from
    file. Takes a file handle created by `open()' function and an optional
    hash of options. Supported attributes are: * block_size :: Integer | The
    size of read block, 1024 bytes by default. * is_finite :: Bool | Forces
    the stream to be processed as finite, 0 by default.

AUTHOR
    Oleg S <me@whitebox.io>

SEE ALSO
  Lazy evaluation in a nutshell
    https://en.wikipedia.org/wiki/Lazy_evaluation

  Library GitHub page:
    https://github.com/icanhazbroccoli/Perl-Data-Enumerable-Lazy

  Alternative implementations:
    https://metacpan.org/pod/List::Generator
    https://metacpan.org/pod/Generator::Object
    https://metacpan.org/pod/Iterator

COPYRIGHT AND LICENSE
    Copyright 2017, 2018 Oleg S <me@whitebox.io>

    Copying and distribution of this file, with or without modification, are
    permitted in any medium without royalty provided the copyright notice
    and this notice are preserved. This file is offered as-is, without any
    warranty.