Skip to content

muir/Stream-Aggregate

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

NAME
     Stream::Aggregate - generate aggregate information from a stream of data

SYNOPSIS
     use Stream::Aggregate;

     my $af = generate_aggregation_func(
            $agg_config, 
            $extra_parameters, 
            $user_extra_parameters);

     while ($log = ???) {
            @stats = $af->($log);
     }
     @stats = $af->(undef);

DESCRIPTION
    Stream::Aggregate is a general-purpose aggregation module that will
    aggregate from a stream of perl objects. While it was written
    specifically for log processing, it can be used for other things too.

    Aggregation has two key elements: how you group things and what you
    aggregate. This module understands two different ways to group things:
    nested and cross-product.

    Nested groupings come from processing sorted input: if you have three
    fields you are considering your context, the order in which the data is
    sorted must match the order in which these fields make up your context.
    If you want to count things by URL, then you must sort your input by
    URL.

    Cross-product groupings come from processing unsorted input. Each
    combination of values of the fields that make up your context is another
    context. This can lead to memory exhaustion so you must specify the
    maximum number of values for each of the fields.

  Nested groupings
    Nested groups are most easily illustrated with a simple example:
    aggregating by year, month, and day. The input data must be sorted by
    year, month, and day. The current context is defined by the triplet:
    (year, month, day). That triplet must be returned by the "context" code.
    It is stored in the @current_context array. When a context is finished,
    it must be converted into a hash by "context2columns".

    Doing it this way, you can, for example, get the average of some data
    item per day, per month, and per year in one pass though your data.

  Cross-Product grouping
    Cross Product grouping does not depend on the sort order of the input
    and can have many contexts active at the same time.

    For example, if you're aggregating sales figures for shoes and want
    statistics for the combinations of size, width, and color there isn't a
    sort or nesting order that will answer your questions.

    Use "crossproduct" to limit yourself to a certain number of values for
    each variable (say 10 sizes, 3 widths, and 5 colors).

  API
    The configuration for Stream::Aggregate is compiled into a perl function
    which is then called once for each input object. Each time it is called,
    it may produce one or more aggregate objects as output. When there is no
    more input data, call the aggregation function with "undef" as its
    input.

    The generate-the-function routine, "generate_aggregation_func" takes
    three parameters. The first is the configuration object (defined below).
    The configuration object is expected (but not required) to come from a
    YAML file. All examples are in YAML format. The second and third
    arguments provide extra information. Currently they are only used to get
    a description of what this aggregation is trying to do using the "name"
    field. Eg:

     generate_aggregation_func($agg_config, $extra, $user_extra);

     my $code = qq{#line 1 "FAKE-all-code-for-$extra->{name}"\n};

    The configuration object for Stream::Aggregate is expected to be read
    from a YAML file but it does not have to be created that way.

    For some of the code fields (below), marked as Closure/Config, you can
    provide a closure instead of code. To do that, have a "BEGIN" block
    assign a value (the closure) to the variable $coderef. If you do this,
    code outside the "BEGIN" block will only be compiled but will never be
    run. When evalutating the BEGIN block, the variable $agg_config will be
    set to the value of *key_config* (assuming the field was *key*).

    The behavior of "generate_aggregation_func" in array context may change
    in the future to provide additional return values.

  CONTEXT-RELATED CONFIGURATION PARAMETERS
    As the aggregator runs over the input, it needs to know the boundries of
    the contexts so that it knows when to generate an aggregation result
    record.

    For example, if you were aggretgating information about URL with nested
    groupings, you need to sort your input by URL and you need to define a
    "context" that returns the URL (in YAML format, with $log as your input
    variable):

     context: |
       return ($log->{url})

    If you want to aggregate over both the URL and the web host, the
    "context" must return an array: host & URL (in YAML format):

     context: |
       $log->{url} =~ m{(?:https?|ftp)://([^/:]+)}
       my $host = $1;
       return ($host, $log->{url})

    When the context is has multiple levels like that, there will be a
    resultant aggregation record for each value at each level.

    context                Code, Optional. Given a $log entry, return an
                           array that describes the aggregation context. For
                           example, for a URL, this array might be: domain
                           name; host name (if different from domain name);
                           each component of the path of the URL except the
                           final filename. As Aggregate runs, it will
                           generate an aggregation record for each element
                           of the array.

                           This code will be invoked on every input record.

                           This is not required for cross-product
                           aggregations but is required for nested
                           aggregations.

    context2columns        Code, Optional. Given a context, in
                           @current_context, return additional key/value
                           pairs for the resulting aggregation record. This
                           is how the context gets described in the
                           aggregation results records.

                           This code will be invoked to generate resultant
                           values just before a context is closed.

                           If this code sets the variable $suppress_result,
                           then this aggregation result will be discarded.

    stringify_context      Code, Optional.

                           If the new context array returned by the
                           "context" code (soon to become @current_context)
                           is not an array of strings but rather an array of
                           references, it will be turned into strings using
                           YAML. These strings may not matter because you
                           control how the context manifests in the result
                           records with "context2columns".

                           If this isn't what you want, use
                           "stringify_context" to do something different.
                           Unlike most of the other functions,
                           "stringify_context" operates on @_.

                           This will be invoked for every input record.

    crossproduct           Hash, Name->Number, Optional.

                           For crossproduct groupings, this defines the
                           dimensions. The keys are the variables. The
                           values are the maximum number of values for each
                           variable to track.

                           The keys must be "ephemeral0", "ephemeral", or
                           "ephemeral2" column names.

    combinations           Hash, Name->Code, Optional.

                           If you have crossproduct groupings, do you also
                           want to synthesize contexts that exclude some or
                           all of the crossproduct dimensions?

                           For each dimension, provide code that that
                           answers the question: if you remove this
                           dimension from the crossproduct, should this new
                           context be considered? This code can look at $row
                           to see what other dimensions are active for this
                           potential context. To keep this context, the code
                           must evaluate to a true value.

                           Combinations are evaluated in alphabetical order.
                           If there is more than one path to a combination,
                           only the first path will be considered. For
                           example, if you have three crossproduct
                           dimensions, "a", "b" and "c" then the
                           combinations are (in the order the'll be
                           considered):

                           "b" and "c", excluding "a"
                           "a" and "c", excluding "b"
                           "a" and "b", excluding "c"
                               These have no dependency and will be produced
                               if the combinations code returns a true
                               value.

                           "c", excluding "a" and "b"
                               This possibility will only be explored coming
                               from "b" and "c". If the combination rule for
                               "a" rejected the combination, then the
                               "c"-only permuation will never be reached.

                           "b", excluding "a" and "c"
                               This possibility will only be explored coming
                               from "b" and "c". If the combination rule for
                               "a" rejected the combination, then the
                               "b"-only permuation will never be reached.

                           "a", excluding "b" and "c"
                               This possibility will only be explored coming
                               from "a" and "c". If the combination rule for
                               "b" rejected the combination, then the
                               "a"-only permuation will never be reached.

                           excluding "a", "b" and "c"
                               This possibility will only be explored coming
                               from "c". If the combination rule for "a" or
                               "c" rejected the combination, then the empty
                               permuation will never be reached.

                           Using combinations can greatly expand your memory
                           requirements. More than four dimensions of
                           combinations is probably a bad idea.

                           If you want skip a dimension entirely, leave it
                           out of the combinations key rather than adding
                           key that evaluates to false. A key the evaluates
                           to false will still have to be tested many times
                           whereas a missing key won't have any code
                           generated for it.

    simplify               Hash, Name->Code, Optional.

                           When a cross-product key is exceeding its quota
                           of values, the default replacement value is "*".
                           This hash allows you to override the code that
                           chooses the new value.

    finalize_result        Code, Optional, Closure/Config.

                           This code will be called after the resultant
                           values for a context have been calculated. It is
                           a last-chance to modify them or to suppress the
                           results. The values can be found as a reference
                           to a hash: $row. To suppress the results, set
                           $suppress_results.

    new_context            Code, Optional, Closure/Config.

                           This code will be called each time there is a new
                           context. At the time it is called, $ps is a
                           reference to the new context, but
                           @current_context will not yet have been updated
                           to the new value.

    merge                  Code, Optional, Closure/Config.

                           When using multiple levels of contexts, data is
                           counted for the top-most context layer only. When
                           that top-most layer finishes, the counts are
                           merged into the next more-general layer.

                           During the merge there is both $ps and $oldps
                           available to for code to reference. The default
                           merge handles all of the pre-defined column
                           types. If you are using "$ps->{heap}" storage for
                           context data, you need to merge that data from
                           $oldps to $ps yourself.

  CONTROL FLOW CONFIGURATION
    filter                 Code, Optional. Before any of the columns are
                           calculated or any of the values saved, run this
                           filter code. If it returns a true value then
                           proceed as normal. If it returns a false value,
                           then do not count any of the values. The filter
                           code can remember things in "$ps-"{heap}> that
                           might effect how other things are counted.

                           In some situations, you many want to throw away
                           most data and count things in the filter. When
                           doing that, it may be that all of the columns
                           come from "output".

                           This may be redesigned in a future release in a
                           way that is not backwards compatible.

    filter_early           Boolean, Optional, default "false". Check the
                           filter early before figuring out contexts? If so,
                           and the result is filtered, don't check to see if
                           the context changed.

    passthrough            Code, Optional. Add results to the output of the
                           aggregation. A value of $log (assuming that's
                           your input record variable) adds the input data
                           stream to the output stream. The passthrough code
                           is run before the input line is counted.

  PARAMETERS CONFIGURATION
    max_stats_to_keep      Number, Optional, default: 4000.

                           When aggregating large amounts of data, limit
                           memory use by throwing away some of the data.
                           When data is thrown away, keep this number of
                           samples for statistics functions that need bulk
                           data like standard_deviation.

    reduce                 Code, Optional, Closure/Config.

                           When "max_stats_to_keep" is exceeded, data will
                           be thrown away. This function will be called when
                           that has happened.

    preprocess             Code, Optional. Code to preprocess the input $log
                           objects.

    item_name              String, Optional, default: $log. The default name
                           for the input values to aggregate over is $log.
                           If this name is not appropriate, you can use
                           "item_name" to change the variable name of the
                           input values to something else. Include the
                           dollar sign ("$") in the name.

    debug                  Boolean, Optional. Print out some debugging
                           information, including the code that is generated
                           for building the columns.

    strict                 Boolean, Optional, default: false. Enforce strict
                           and warnings for user code.

  AGGREGATE OUTPUT COLUMNS CONFIGURATION
    Each of these (except "ephemeral" & "keep") defines additional columns
    of output that will be included in each aggregation record. These are
    all optional and all are defined as key/value pairs where the keys are
    column names and the values are perl code. You can refer to other
    columns using the variable "$column_*column_name*" where *column_name*
    is the name of one of the other columns. When refering to other columns,
    the order in which columns are processed matters: "ephemeral" and "keep"
    are processed first and second respecively. Idential code fragments will
    be evaluated only once. Within a group, columns are evaluated
    alphabetically.

    Some of the columns will have their code evaluated per-item and some are
    evaluated per-aggregation.

    The input data is in $log unless overriden by "item_name".

   Per item callbacks
    ephemeral              These columns will not be included in the
                           aggregation data. Refer to them as
                           "$column_*column_name*". If you are using
                           ephemeral to declare the column but do not want
                           to assign it a value, set the value for the
                           ephemeral code to be undef. In YAML, thats "~":

                            ephemeral:
                              var1: ~
                              var2: ~

    ephemeral0             Same as "ephemeral", will be evaluated before
                           "ephemeral".

    ephemeral2             Same as "ephemeral", will be evaluated after
                           "ephemeral".

    counter                Keep a counter. Add one if the code returns true.

    percentage             Keep a counter. Include the percentage of items
                           for which the code returned true as an output
                           column as opposed to the number of items where
                           the code returned 0. A return value of "undef"
                           does not get counted at all.

    sum                    Keep an accumulator. Add the return values.

    mean                   Keep an accumulator. Add the return values.
                           Divide by the number of items before inserting
                           into the results. Items whose value is "undef" do
                           not count towards the number of items or the sum.

    standard_deviation     Remeber the return values. Compute the standard
                           deviation of the accumulated return values and
                           insert that into the results. Items whose value
                           is "undef" are removed before calculating the
                           standard_deviation.

    median                 Remeber the return values. Compute the median of
                           the accumulated return values and insert that
                           into the results. Items whose value is "undef"
                           are removed before calculating the median.

    dominant               Remeber the return values. Compute the mode (most
                           frequent) of the accumulated return values and
                           insert that into the results. Items whose value
                           is "undef" are removed before calculating the
                           mode.

    min                    Keep a minimum numeric value. Replace it with the
                           return value if the return value is less than the
                           current value. Items whose value is "undef" are
                           removed before calculating the min.

    max                    Keep a maximum numeric value. Replace it with the
                           return value if the return value is greater than
                           the current value. Items whose value is "undef"
                           are removed before calculating the max.

    minstr                 Keep a minimum string value. Replace it with the
                           return value if the return value is less than the
                           current value. Items whose value is "undef" are
                           removed before calculating the minstr.

    maxstr                 Keep a maximum string value. Replace it with the
                           return value if the return value is greater than
                           the current value. Items whose value is "undef"
                           are removed before calculating the maxstr.

    keep                   Remember the return values. The return values are
                           available at aggregation time as
                           "@{$ps->{keep}{column_name}}". Items whose value
                           is "undef" are kept but they're ignored by
                           Stream::Aggregate::Stats functions.

   Per aggregation result record callbacks
    For code that is per-aggregation, the saved aggregation state can be
    found in $ps. One item that is probably needed is "$ps->{item_count}".

    output                 Extra columns to include in the output. This is
                           where to save "$ps->{item_count}".

    stat                   Use arbitrary perl code to compute statistics on
                           remembered return values kept with "keep". Write
                           your own function or use any of the functions in
                           Stream::Aggregate::Stats (the global variable is
                           pre-loaded). No, there isn't any difference
                           between this and "output".

  VARIALBES AVAILABLE FOR CODE SNIPPETS TO USE
    The following variables are available for the code that generates
    per-item and per-aggregation statistics:

    $log                   The current input item (unless overridden by
                           "item_name")

    $ps->{keep}{column_name}
                           An array of return values kept by "keep".

    $ps->{numeric}{column_name}
                           If Stream::Aggregate::Stats functions are called,
                           they will grab the numeric values from
                           "$ps->{keep}{column_name}" and store them in
                           "$ps->{numeric}{column_name}".

    $ps->{random}          For each kept item in "$ps->{keep}{column_name}",
                           there is a corrosponding item in $ps->{random}
                           that is a random number. These random numbers are
                           used to determine which values to keep and which
                           values to toss if there are too many values to
                           keep them all.

    $ps->{$column_type}{column_name}
                           For each type of column (output, counter,
                           percentage, sum, min, standard_deviation, median,
                           stat) the values that will be part of the final
                           aggregation record.

    $ps->{$tempoary_type}{column_name}
                           Some columns need temporary storage for their
                           values: percentage_counter (the counter used by
                           percentage); percentage_total (the number of
                           total items); mean_sum (the sum used to compute
                           the mean); mean_count (the number of items for
                           the mean).

    $ps->{heap}            A hash that can be used by the configured perl
                           code for whatever it wants.

    $ps->{item_counter}    The count of items.

    $agg_config            The configuration object for Stream::Aggregate

    $itemref               A reference to $log. It's always $itemref even if
                           $log is something else.

    @current_context       The current context as returned by "context".

    @context_strings       The string-ified version @current_context as
                           returned by "stringify_context" or YAML.

    @contexts              The array of context objects. $ps is always
                           $context[-1].

    @items_seen            An array that counts the number of rows of output
                           from this aggregation. When the context is
                           multi-level, the counter is multi-level. For
                           example, if the context is *domain*, *host*, and
                           *URL*; then $items_seen[0] is the number of
                           *domains* (so far), and $items_seen[1] is the
                           number of *hosts* for this *domain* (so far), and
                           $items_seen[2] is the number of *URLs* for this
                           *host* (so far).

                           Passthrough rows do not count.

                           XXX what about cross-product aggregations?

    $row                   When gathering results, the variable that holds
                           them is a reference to a hash: $row.

    $suppress_result       After gathering results, the $suppress_result
                           variable is examined. If it's set the results (in
                           $row) are discards.

                           To skip results that aren't crossproduct results,
                           in "finalize_result", set $suppress_result if
                           $cross_count isn't true.

    $cross_count           The number of currently active crossproduct
                           accumulator contexts.

    $extra, $user_extra    The additional paramerts (beyond $agg_config)
                           that were passed to
                           "generate_aggregation_func()".

    %persist               This hash is not used by Stream::Aggregate. It's
                           available for any supplied code to use however it
                           wants.

    $last_item             A refernece to the previous $log object. This is
                           valid during "finalize_result" and
                           "context2columns".

    There are more. Read the code.

  HELPER FUNCTIONS
    The following helper functions are available: everything in
    Stream::Aggregate::Stats and:

    nonblank($value)
        Returns $value if $value is defined and not the empty string.
        Returns undef otherwise.

  EXAMPLE1
    This example will look at a set of records regarding health risks. Each
    record represents a person:

     name<TAB>birthday<TAB>sex<TAB>number of hospital visits in the last year

    We will generate the following aggregation records:

    *   Average age of entire sample

    *   Median number of hospital visits in the last year for the entire
        sample.

    *   Median number of hospital visits in the last year by sex.

    *   Median number of hospital visits in the last year by age, with a
        minimum sample size of five.

    *   Median number of hospital vists in the last year by age and sex with
        a minimum sample size of five.

    The code:

      #!/usr/bin/perl 

      #
      # Our input data is the raw strings from the input file.  Most of the
      # work is parsing them and reformatting the data.
      #

      use strict;
      use warnings;
      use Stream::Aggregate;
      use YAML;

      my $aconfig = Load(<<'END_ACONFIG');
      strict:                 1
      debug:                  0
      item_name:              $record
      max_stats_to_keep:      500
      filter_early:           1
      filter: |
        # ignore black lines and comments
        return 0 if $record =~ /^#/;
        return 0 if $record =~ /^$/;
        return 1;
      crossproduct:
        sex:                  3
        age:                  150
      combinations:
        sex:                  1
        age:                  1
      ephemeral0:
        #
        # We are using ephemeral0 to declare the column variables
        #
        name:                 ~
        birthday:             ~
        gender:               ~
        number_of_visits:     ~
      ephemeral:
        # 
        # We are using a fake column ($column_step1) in ephemeral to initialize 
        # the raw column variables we declared in ephemeral0
        #
        step1: |
           chomp($record);
           ($column_name, $column_birthday, $column_gender, $column_number_of_visits) = split(/\t/, $record);
      ephemeral2:
        #
        # We are using ephemeral2 to generate the computed input data
        #
        age: |
          use Time::ParseDate qw(parsedate);
          my $t = parsedate($column_birthday, NO_RELATIVE => 1, DATE_REQUIRED => 1, WHOLE => 1, GMT => 1);
          return undef unless $t;
          return int ((parsedate('2011-05-01', GMT => 1) - $t) / (365.24 * 86400))
        sex: |
          return 'M' if $column_gender =~ /^m/i;
          return 'F' if $column_gender =~ /^f/i;
          return undef;
        hospital_visits: |
          $column_number_of_visits =~ /^(\d+)$/;
          $1
      output:
        sample_size:          $ps->{item_counter}
      median:
        avg_hospital_visits:  $column_hospital_visits
      mean:                   
        avg_age:              $column_age
      finalize_result: |
        #
        # Don't generate result records unless there are at
        # least five items being aggregated.
        #
        $suppress_result = 1 if $ps->{item_counter} < 5;
      END_ACONFIG

      my $ag = generate_aggregation_func($aconfig, { 
              name  => 'Aggregate Hospital Visits',
      });

      my @results;

      while (<>) {
              for my $result ($ag->($_)) {
                      # do something with the result records
              }
      }

      for my $result ($ag->(undef)) {
              # do something with the result records
      }

EXAMPLE2
    Our example will count the following things: number of unique URLs per
    domain, average length of the URL.

      #!/usr/bin/perl 

      use strict;
      use warnings;
      use Stream::Aggregate;
      use YAML;

      my $aconfig = Load(<<'END_ACONFIG');
      strict:                 1
      debug:                  0
      item_name:              $item
      context:                $item->{domain}
      context2columns:        return (domain => $current_context[0])
      ephemeral:
        # 
        # The only persistent unstructured place to store data from 
        # one row to the next is $ps->{heap}.   $ps->{heap} 
        # is per-context, but that's okay for our usage.
        #
        is_different: |
          my $old = $ps->{heap}{last_item};
          $ps->{heap}{last_item} = $item;
          return 1 unless $old;
          return 0 if $old->{url} eq $item->{url};
          return 1;
      sum:
        unique_urls:          $column_is_different
      mean:                   
        avg_url_length:       length($item->{url})
      finalize_result: |
        # we don't want the roll-up context of all domains
        $suppress_result = 1 unless $row->{domain};
      END_ACONFIG

      my $ag = generate_aggregation_func($aconfig, { 
              name  => 'Aggregate URL data'
      });

      while(<>) {
              # we'll parse the input here
              my $item;
              chomp;
              next if /^$/;
              next if /^#/;
              die "'$_'" unless m{^\w+:\/\/([^/]+)(?:/.*)?};
              $item = {
                      domain        => $1,
                      url   => $_,
              };
              for my $result ($ag->($item)) {
                      # do something with the result records
              }
      }

      for my $result ($ag->(undef)) {
            # do something with the results records
      }

LICENSE
    Copyright (C) 2008-2010 David Sharnoff; Copyright (C) 2011 Google, Inc.

    This package may be used and redistributed under the terms of either the
    Artistic 2.0 or LGPL 2.1 license.

Releases

No releases published

Packages

No packages published

Languages