In [2]:
from toolz import compose, pipe
from functools import reduce
from itertools import repeat
from toolz.curried import curry, peek, drop, juxt
# New functional ideas
# A decorator that makes a function unpack then process elements of a tuple
star = curry(lambda f, tup: f(*tup))
# Takes a number of Boolean function and returns a Boolean that is True when all of the argument functions are True
compose_all = compose(all, juxt)
# Note: compose(filter(f), filter(g)) == filter(compose_all(f,g))
# Takes a number of Boolean function and returns a Boolean that is True when any of the argument functions are True
compose_any = compose(any, juxt)
# Combine map and filter in one package in one pass
filtermap = curry(lambda pred, func, seq: (func(i) for i in seq if pred(i)))
# Note invariance: compose(filter(f), map(g)) == filtermap(f, g)

#A function for performing mutliple reductions simultaneously
from toolz import peek, drop
from itertools import repeat

@curry
def folds(fns, seq, inits = None):
    """ Perform multiple reductions/folds simultaneously, returning a tuple of accumulators.
    
    Parameters
    - fns: A sequence of update functions, e.g. for use in reduce(fn, seq, init)
    - seq: A sequence (possibly lazy)
    - inits: Optional sequence initial values
    
    Note:  If inits = None, the first element of the sequence will be 
    used as an initial value for each accumulator.  If inits is a sequence, 
    each value is either the desired initial value or None.  
    When an element of inits is None, the first value in the sequence will be 
    used as the initial value for that particular accumulator.
    
    Example:
    
    total, count, max_, min_ = folds((lambda a, i: a + i, 
                                      lambda a, i: a + 1,
                                      lambda a, i: a if a >= i else i,
                                      lambda a, i: a if a <= i else i),
                                      map(lambda x: x**2, range(5)),
                                      inits = (0,0, None, None))
    
    """
    f, seq = peek(seq)
    if inits is None:
        inits = repeat(f, len(fns))
    else:
        inits = [f if a is None else fn(a, f)
                 for a, fn in zip(inits, fns)]
    update = lambda accs, i: [f(a,i) for a, f in zip(accs, fns)]
    return reduce(update, drop(1, seq), inits)

folds((lambda a, i: a + i, 
       lambda a, i: a + 1,
       lambda a, i: a if a >= i else i,
       lambda a, i: a if a <= i else i),
      map(lambda x: x, range(3)),
      inits = (0,0, None, None))

[3, 3, 2, 0]

In [3]:
### Splitting the stream
from toolz.curried import curry, map
from toolz import compose, juxt, pipe
from more_itertools import side_effect
side_effect = curry(side_effect)

L = [1,2,3,4]
double = lambda n: 2*n
sqr = lambda n: n**2
add2 = lambda n: n + 2

In [4]:
test = lambda *fs: juxt(*fs)
test(double, sqr)
test(L)

<toolz.functoolz.juxt at 0x10813bc18>

In [51]:
from functools import reduce
maybe_apply = lambda f, x: f(x) if f is not None else x
split_item = curry(lambda n, x: tuple(repeat(x, times=n)))
split = lambda *fs:  map(lambda x: tuple(maybe_apply(f, x) for f in fs))


channels = lambda *fs: lambda tup: tuple(f(i) if f is not None else i
                                      for f, i in zip(fs, tup))
channels(None, add2, sqr, None)((1,2,3,4))

(1, 4, 9, 4)

In [6]:
pipe(L,
    split(add2, sqr, double, None),
    map(channels(None, add2, sqr, None)),
     list)

[(3, 3, 4, 1), (4, 6, 16, 2), (5, 11, 36, 3), (6, 18, 64, 4)]

In [7]:
channelMap = lambda *fs: map(channels(*fs))

In [8]:
pipe(L,
    split(add2, sqr, double, None),
    channelMap(None, add2, sqr, None),
     list)

[(3, 3, 4, 1), (4, 6, 16, 2), (5, 11, 36, 3), (6, 18, 64, 4)]

In [9]:
unsplit = lambda binop, init=None: map(lambda tup: reduce(binop, tup, init))

In [10]:
from operator import add
pipe(L,
    split(add2, sqr, double, None),
    channelMap(None, add2, sqr, None),
    unsplit(add, 0),
    list)

[11, 28, 55, 92]

In [11]:
from toolz.sandbox import unzip
pipe(L,
     split(  add2,  sqr,  double, None),
     channelMap( None,  add2, sqr,    None),
     unzip)

(<map at 0x1081645c0>,
 <map at 0x108164940>,
 <map at 0x108164710>,
 <map at 0x1081642e8>)

In [12]:
count = lambda a, i: a + 1
pipe(L,
     split(  add2,  sqr,  double, None),
     channelMap( None,  add2, sqr,    None),
     unzip,
     channels(list, list, list, list))

([3, 4, 5, 6], [3, 6, 11, 18], [4, 16, 36, 64], [1, 2, 3, 4])

In [13]:
from operator import add
from toolz.sandbox import unzip
totaler = lambda seq: reduce(add, seq, 0)
counter = lambda seq: reduce(count, seq, 0)
pipe(L,
     split(  add2,  sqr,  double, None),
     channelMap( None,  add2, sqr,    None),
     unzip,
     channels(totaler,  counter, totaler,  counter))

(18, 4, 120, 4)

In [14]:
# Application 1 - simultaneous reduction
from itertools import repeat

@curry
def partialReduce(func, init=None):
    if init is None:
        return lambda seq: reduce(func, seq)
    else:
        return lambda seq: reduce(func, seq, init)
    
def channelReduce(*helpers, init=None):
    init = init if init is not None else repeat(len(helpers), None)
    rs = star(channels)(map(partialReduce, helpers, init)) 
    return compose(rs, unzip)

pipe(L,
     split(  add2,  sqr,  double, None),
     channelMap( None,  add2, sqr,    None),
     channelReduce(add, count, max, min, init=(0,0,None,None)))

(18, 4, 64, 1)

In [15]:
from operator import add, truediv
def multiReduce(*fs, init=None):
    return compose(channelReduce(*fs, init=init), split(*repeat(None, len(fs))))

fs = (add, count)
pipe(range(5),
     map(add2),
     multiReduce(add, count, init=(None, 0)),
     star(truediv))

4.0

In [16]:
imean = compose(star(truediv), multiReduce(add, count, init=(None, 0)))
fs = (add, count)
pipe(range(5),
     map(add2),
     imean)

4.0

In [17]:
help(repeat)

Help on class repeat in module itertools:

class repeat(builtins.object)
 |  repeat(object [,times]) -> create an iterator which returns the object
 |  for the specified number of times.  If not specified, returns the object
 |  endlessly.
 |  
 |  Methods defined here:
 |  
 |  __getattribute__(self, name, /)
 |      Return getattr(self, name).
 |  
 |  __iter__(self, /)
 |      Implement iter(self).
 |  
 |  __length_hint__(...)
 |      Private method returning an estimate of len(list(it)).
 |  
 |  __new__(*args, **kwargs) from builtins.type
 |      Create and return a new object.  See help(type) for accurate signature.
 |  
 |  __next__(self, /)
 |      Implement next(self).
 |  
 |  __reduce__(...)
 |      Return state information for pickling.
 |  
 |  __repr__(self, /)
 |      Return repr(self).



In [18]:
max(map(add2, range(5)))

6

## Using Dictreader to represent table rows

In [19]:
from csv import DictReader
from more_itertools import with_iter
from toolz import compose, pipe
from toolz.curried import *
read_csv = compose(DictReader, with_iter, open)

In [20]:
example = pipe('Batting.csv',
                read_csv,
                take(5),
                list)
example

[{'2B': '0',
  '3B': '0',
  'AB': '4',
  'BB': '0',
  'CS': '0',
  'G': '1',
  'GIDP': '',
  'H': '0',
  'HBP': '',
  'HR': '0',
  'IBB': '',
  'R': '0',
  'RBI': '0',
  'SB': '0',
  'SF': '',
  'SH': '',
  'SO': '0',
  'lgID': 'NA',
  'playerID': 'abercda01',
  'stint': '1',
  'teamID': 'TRO',
  'yearID': '1871'},
 {'2B': '6',
  '3B': '0',
  'AB': '118',
  'BB': '4',
  'CS': '1',
  'G': '25',
  'GIDP': '',
  'H': '32',
  'HBP': '',
  'HR': '0',
  'IBB': '',
  'R': '30',
  'RBI': '13',
  'SB': '8',
  'SF': '',
  'SH': '',
  'SO': '0',
  'lgID': 'NA',
  'playerID': 'addybo01',
  'stint': '1',
  'teamID': 'RC1',
  'yearID': '1871'},
 {'2B': '4',
  '3B': '5',
  'AB': '137',
  'BB': '2',
  'CS': '1',
  'G': '29',
  'GIDP': '',
  'H': '40',
  'HBP': '',
  'HR': '0',
  'IBB': '',
  'R': '28',
  'RBI': '19',
  'SB': '3',
  'SF': '',
  'SH': '',
  'SO': '5',
  'lgID': 'NA',
  'playerID': 'allisar01',
  'stint': '1',
  'teamID': 'CL1',
  'yearID': '1871'},
 {'2B': '10',
  '3B': '2',
  'AB': '13

## Select

- like `pluck` but returns dictionaries
- give a list of column_ids and return filtered rows

In [21]:
@curry
def select_row(keys, row):
    if isinstance(keys, list):
        helper = lambda key: key in keys
    else:
        helper = lambda key: key == keys
    return keyfilter(helper, row)

In [22]:
select_row('H', example[0] )

{'H': '0'}

In [23]:
select_row(['playerID', 'H'], example[0])

{'H': '0', 'playerID': 'abercda01'}

In [24]:
select = curry(lambda keys, seq: map(select_row(keys), seq))
list(select(['playerID', 'H'], example))

[{'H': '0', 'playerID': 'abercda01'},
 {'H': '32', 'playerID': 'addybo01'},
 {'H': '40', 'playerID': 'allisar01'},
 {'H': '44', 'playerID': 'allisdo01'},
 {'H': '39', 'playerID': 'ansonca01'}]

## Filtering the rows

- Use `filter`
- Helper functions built for dictionary rows

In [25]:
lt = operator.lt
maybe_int = lambda i: int(i) if len(i) > 0 else 0
at_least_500_AB = compose(lt(500),maybe_int, get('AB'))
player_AB_H = pipe('Batting.csv',
                    read_csv,
                    select(['playerID', 'H', 'AB']),
                    filter(at_least_500_AB),
                    list)
     
player_AB_H[:10]

[{'AB': '521', 'H': '161', 'playerID': 'dalryab01'},
 {'AB': '518', 'H': '139', 'playerID': 'hornujo01'},
 {'AB': '504', 'H': '187', 'playerID': 'ansonca01'},
 {'AB': '522', 'H': '118', 'playerID': 'bierblo01'},
 {'AB': '578', 'H': '147', 'playerID': 'comisch01'},
 {'AB': '540', 'H': '143', 'playerID': 'corkhpo01'},
 {'AB': '524', 'H': '141', 'playerID': 'gleasbi01'},
 {'AB': '522', 'H': '126', 'playerID': 'hankifr01'},
 {'AB': '565', 'H': '180', 'playerID': 'larkihe01'},
 {'AB': '578', 'H': '174', 'playerID': 'lathaar01'}]

## Column conversions

- give a dictionary of column_label:convert_func
- map the convert_func to corresponding entries

In [26]:
maybe_convert = lambda func_dict, key, val: get(key, func_dict)(val) if key in func_dict else val
convert_row = curry(lambda func_dict, row: {key:maybe_convert(func_dict, key, val)
                                 for key, val in row.items()})

In [27]:
convert_row({'H':int}, player_AB_H[1])

{'AB': '518', 'H': 139, 'playerID': 'hornujo01'}

In [28]:
convert_row({'H':maybe_int, 'AB':maybe_int}, player_AB_H[1])

{'AB': 518, 'H': 139, 'playerID': 'hornujo01'}

In [29]:
transform = curry(lambda func_dict, seq: map(convert_row(func_dict), seq))
list(transform({'H':int, 'AB':int}, player_AB_H))

[{'AB': 521, 'H': 161, 'playerID': 'dalryab01'},
 {'AB': 518, 'H': 139, 'playerID': 'hornujo01'},
 {'AB': 504, 'H': 187, 'playerID': 'ansonca01'},
 {'AB': 522, 'H': 118, 'playerID': 'bierblo01'},
 {'AB': 578, 'H': 147, 'playerID': 'comisch01'},
 {'AB': 540, 'H': 143, 'playerID': 'corkhpo01'},
 {'AB': 524, 'H': 141, 'playerID': 'gleasbi01'},
 {'AB': 522, 'H': 126, 'playerID': 'hankifr01'},
 {'AB': 565, 'H': 180, 'playerID': 'larkihe01'},
 {'AB': 578, 'H': 174, 'playerID': 'lathaar01'},
 {'AB': 556, 'H': 124, 'playerID': 'mannija01'},
 {'AB': 595, 'H': 152, 'playerID': 'mcclebi01'},
 {'AB': 560, 'H': 150, 'playerID': 'mcphebi01'},
 {'AB': 579, 'H': 190, 'playerID': 'oneilti01'},
 {'AB': 571, 'H': 193, 'playerID': 'orrda01'},
 {'AB': 585, 'H': 160, 'playerID': 'phillbi01'},
 {'AB': 597, 'H': 156, 'playerID': 'pinknge01'},
 {'AB': 538, 'H': 189, 'playerID': 'richaha01'},
 {'AB': 559, 'H': 127, 'playerID': 'rosemch01'},
 {'AB': 560, 'H': 117, 'playerID': 'sommejo01'},
 {'AB': 503, 'H': 156,

## Merge rows

- like `merge_with` but allows different merge functions per column
- Arguments: merge_funcs - dictionary
    - keys = col label
    - values = merge functions

In [31]:
grouped_example = pipe('Batting.csv',
                       read_csv,
                       select(['playerID', 'H', 'AB']),
                       filter(at_least_500_AB),
                       transform({'AB':int, 'H':maybe_int}),
                       take(1000),
                       groupby(get('playerID')))
grouped_example

{'abbated01': [{'AB': 579, 'H': 148, 'playerID': 'abbated01'},
  {'AB': 610, 'H': 170, 'playerID': 'abbated01'}],
 'abbeych01': [{'AB': 523, 'H': 164, 'playerID': 'abbeych01'},
  {'AB': 511, 'H': 141, 'playerID': 'abbeych01'}],
 'abstebi01': [{'AB': 512, 'H': 133, 'playerID': 'abstebi01'}],
 'allenbo01': [{'AB': 563, 'H': 128, 'playerID': 'allenbo01'}],
 'alperwh01': [{'AB': 558, 'H': 130, 'playerID': 'alperwh01'}],
 'altizda01': [{'AB': 540, 'H': 145, 'playerID': 'altizda01'}],
 'anderjo01': [{'AB': 576, 'H': 190, 'playerID': 'anderjo01'},
  {'AB': 524, 'H': 149, 'playerID': 'anderjo01'},
  {'AB': 550, 'H': 156, 'playerID': 'anderjo01'},
  {'AB': 558, 'H': 155, 'playerID': 'anderjo01'},
  {'AB': 583, 'H': 158, 'playerID': 'anderjo01'}],
 'andreed01': [{'AB': 528, 'H': 126, 'playerID': 'andreed01'}],
 'ansonca01': [{'AB': 504, 'H': 187, 'playerID': 'ansonca01'},
  {'AB': 515, 'H': 177, 'playerID': 'ansonca01'},
  {'AB': 518, 'H': 161, 'playerID': 'ansonca01'},
  {'AB': 504, 'H': 157, '

In [35]:
one_grouped_set = first(drop(2, grouped_example.items()))[1]
one_grouped_set

[{'AB': 529, 'H': 177, 'playerID': 'milledu01'},
 {'AB': 504, 'H': 162, 'playerID': 'milledu01'},
 {'AB': 586, 'H': 175, 'playerID': 'milledu01'}]

In [36]:
maybe_merge = curry(lambda merge_funcs, key, val1, val2: get(key, merge_funcs)(val1, val2) if key in merge_funcs and val1 is not None else val2)
merge_helper = curry(lambda merge_funcs, last_dict, next_dict: {k:maybe_merge(merge_funcs, k, get(k, last_dict, None), v)
                                                         for k, v in next_dict.items()})
merge_funcs = {'H':add, 'AB':max}
merge_helper(merge_funcs, one_grouped_set[0], one_grouped_set[1])

{'AB': 529, 'H': 339, 'playerID': 'milledu01'}

In [70]:
def maybe_merge(merge_funcs, key, val1, val2):
    if key in merge_funcs and val1 is not None:
        return get(key, merge_funcs)(val1, val2)
    else:
        return val2
@curry
def merge_helper(merge_funcs, last_dict, next_dict):
    return {key:maybe_merge(merge_funcs, key, get(key, last_dict, None), val) 
            for key, val in next_dict.items()}

In [71]:
merge_funcs = {'AB':add, 'H': count}
init = {'H':0}
next_row = {'AB':10, 'H':9}
merge_helper(merge_funcs, init, next_row)

{'AB': 10, 'H': 1}

In [72]:
count = lambda a, i: a + 1
@curry
def merge_rows(merge_funcs, rows, init = {}):
    return reduce(merge_helper(merge_funcs), rows, init)
merge_rows({'AB':add, 'H':count},one_grouped_set, init={'H':0})

{'AB': 555, 'H': 1, 'playerID': 'osborfr02'}

In [74]:
merge_funcs = {'AB':count, 'H': add}
grouped_and_merged_example = pipe('Batting.csv',
                       read_csv,
                       select(['playerID', 'H', 'AB']),
                       filter(at_least_500_AB),
                       transform({'AB':int, 'H':maybe_int}),
                       take(1000),
                       groupby(get('playerID')),
                       valmap(merge_rows(merge_funcs, init={'AB':0})))
grouped_and_merged_example

{'abbated01': {'AB': 2, 'H': 318, 'playerID': 'abbated01'},
 'abbeych01': {'AB': 2, 'H': 305, 'playerID': 'abbeych01'},
 'abstebi01': {'AB': 1, 'H': 133, 'playerID': 'abstebi01'},
 'allenbo01': {'AB': 1, 'H': 128, 'playerID': 'allenbo01'},
 'alperwh01': {'AB': 1, 'H': 130, 'playerID': 'alperwh01'},
 'altizda01': {'AB': 1, 'H': 145, 'playerID': 'altizda01'},
 'anderjo01': {'AB': 5, 'H': 808, 'playerID': 'anderjo01'},
 'andreed01': {'AB': 1, 'H': 126, 'playerID': 'andreed01'},
 'ansonca01': {'AB': 6, 'H': 991, 'playerID': 'ansonca01'},
 'babbch01': {'AB': 1, 'H': 138, 'playerID': 'babbch01'},
 'bakerfr01': {'AB': 1, 'H': 165, 'playerID': 'bakerfr01'},
 'barclge01': {'AB': 1, 'H': 163, 'playerID': 'barclge01'},
 'barreji01': {'AB': 5, 'H': 815, 'playerID': 'barreji01'},
 'barrysh01': {'AB': 2, 'H': 308, 'playerID': 'barrysh01'},
 'bassech01': {'AB': 1, 'H': 136, 'playerID': 'bassech01'},
 'batchem01': {'AB': 1, 'H': 143, 'playerID': 'batchem01'},
 'batesjo02': {'AB': 1, 'H': 127, 'playerI

In [76]:
merge_funcs = {'AB':count, 'H': add}
grouped_and_merged_example = pipe('Batting.csv',
                       read_csv,
                       select(['playerID', 'H', 'AB']),
                       filter(at_least_500_AB),
                       transform({'AB':int, 'H':maybe_int}),
                       take(1000),
                       reduceby(get('playerID'), merge_helper(merge_funcs), init={'AB':0}))
grouped_and_merged_example

{'abbated01': {'AB': 2, 'H': 318, 'playerID': 'abbated01'},
 'abbeych01': {'AB': 2, 'H': 305, 'playerID': 'abbeych01'},
 'abstebi01': {'AB': 1, 'H': 133, 'playerID': 'abstebi01'},
 'allenbo01': {'AB': 1, 'H': 128, 'playerID': 'allenbo01'},
 'alperwh01': {'AB': 1, 'H': 130, 'playerID': 'alperwh01'},
 'altizda01': {'AB': 1, 'H': 145, 'playerID': 'altizda01'},
 'anderjo01': {'AB': 5, 'H': 808, 'playerID': 'anderjo01'},
 'andreed01': {'AB': 1, 'H': 126, 'playerID': 'andreed01'},
 'ansonca01': {'AB': 6, 'H': 991, 'playerID': 'ansonca01'},
 'babbch01': {'AB': 1, 'H': 138, 'playerID': 'babbch01'},
 'bakerfr01': {'AB': 1, 'H': 165, 'playerID': 'bakerfr01'},
 'barclge01': {'AB': 1, 'H': 163, 'playerID': 'barclge01'},
 'barreji01': {'AB': 5, 'H': 815, 'playerID': 'barreji01'},
 'barrysh01': {'AB': 2, 'H': 308, 'playerID': 'barrysh01'},
 'bassech01': {'AB': 1, 'H': 136, 'playerID': 'bassech01'},
 'batchem01': {'AB': 1, 'H': 143, 'playerID': 'batchem01'},
 'batesjo02': {'AB': 1, 'H': 127, 'playerI

## Groupby

In [184]:
grouped_example = pipe('Batting.csv',
                       read_csv,
                       select(['playerID', 'H', 'AB']),
                       filter(at_least_500_AB),
                       transform({'AB':int, 'H':maybe_int}),
                       take(1000),
                       groupby(get('playerID')))
grouped_example

{'abbated01': [{'AB': 579, 'H': 148, 'playerID': 'abbated01'},
  {'AB': 610, 'H': 170, 'playerID': 'abbated01'}],
 'abbeych01': [{'AB': 523, 'H': 164, 'playerID': 'abbeych01'},
  {'AB': 511, 'H': 141, 'playerID': 'abbeych01'}],
 'abstebi01': [{'AB': 512, 'H': 133, 'playerID': 'abstebi01'}],
 'allenbo01': [{'AB': 563, 'H': 128, 'playerID': 'allenbo01'}],
 'alperwh01': [{'AB': 558, 'H': 130, 'playerID': 'alperwh01'}],
 'altizda01': [{'AB': 540, 'H': 145, 'playerID': 'altizda01'}],
 'anderjo01': [{'AB': 576, 'H': 190, 'playerID': 'anderjo01'},
  {'AB': 524, 'H': 149, 'playerID': 'anderjo01'},
  {'AB': 550, 'H': 156, 'playerID': 'anderjo01'},
  {'AB': 558, 'H': 155, 'playerID': 'anderjo01'},
  {'AB': 583, 'H': 158, 'playerID': 'anderjo01'}],
 'andreed01': [{'AB': 528, 'H': 126, 'playerID': 'andreed01'}],
 'ansonca01': [{'AB': 504, 'H': 187, 'playerID': 'ansonca01'},
  {'AB': 515, 'H': 177, 'playerID': 'ansonca01'},
  {'AB': 518, 'H': 161, 'playerID': 'ansonca01'},
  {'AB': 504, 'H': 157, '

## Aggregate


- take a key function and 
- Option 1: a reduction update.  In this case, the reduction will be applied to all columns.
- Option 2: a dictionary of update functions, with zero/one update functions per column
- Option 3: a dictionary of dictionaries of reduction functions (one or more per column)
    - keys are column labels
    - values are lists of reduction update functions
- This is a specialized reduceby
- return the aggregated dictionary

In [69]:
one_grouped_set = first(drop(3, grouped_example.items()))[1]
one_grouped_set

[{'AB': 555, 'H': 148, 'playerID': 'osborfr02'}]

In [117]:
def updates(*fns):
    """a collection of binary function, i.e. reduction updates"""
    maybe_update = lambda f, acc, i: i if acc is None else f(acc, i)
    def update_tuples(t1, t2):
        if t1 is None:
            return t2
        else:
            return tuple(maybe_update(f, i1, i2) 
                         for f, i1, i2 in zip(fns, t1, t2))
    return update_tuples
updates(add, count)((1,2), (100,10))

(101, 3)

In [181]:
@curry
def aggregate(key_function, agg_funcs, seq, init={}):
    # One function
    if callable(agg_funcs):
        f, seq = peek(seq)
        merge_funcs = {key:agg_funcs for key in f}
        # One function -> one initial value used for all
        if not isinstance(init, dict):
            init = {key:init for key in f}
    else:
        def make_updates(fns):
            if isinstance(fns, list):
                # List of update functions
                N = len(fns)
                merge_funcs = lambda tup, val: updates(*fns)(tup, split_item(N, val))
                return merge_funcs
            else:
                # One update function
                assert callable(fns)
                return fns
        merge_funcs = {key:make_updates(fns) for key, fns in agg_funcs.items()}
        def make_inits(key, fns, init):
            if init is None or key not in init:
                return list(repeat(None, len(fns))) if isinstance(fns, list) else None
            else:
                return get(key, init)
        init = {key:make_inits(key, fns, init) for key, fns in agg_funcs.items()}
    return reduceby(key_function, merge_helper(merge_funcs), seq, init=init)
        
        

In [176]:
example = pipe('Batting.csv',
               read_csv,
               select(['playerID', 'H', 'AB']),
               filter(at_least_500_AB),
               transform({'AB':int, 'H':maybe_int}))

## Aggregate with one function

In [144]:
example = pipe('Batting.csv',
               read_csv,
               select(['playerID', 'H', 'AB']),
               filter(at_least_500_AB),
               transform({'AB':int, 'H':maybe_int}),
               aggregate(get('playerID'), count, init=0))
example

{'beckfr02': {'AB': 2, 'H': 2, 'playerID': 2},
 'saiervi01': {'AB': 2, 'H': 2, 'playerID': 2},
 'barfije01': {'AB': 3, 'H': 3, 'playerID': 3},
 'mazerbi01': {'AB': 10, 'H': 10, 'playerID': 10},
 'wisesa01': {'AB': 2, 'H': 2, 'playerID': 2},
 'zwilldu01': {'AB': 2, 'H': 2, 'playerID': 2},
 'snydeco02': {'AB': 3, 'H': 3, 'playerID': 3},
 'paulege01': {'AB': 1, 'H': 1, 'playerID': 1},
 'raymoha01': {'AB': 2, 'H': 2, 'playerID': 2},
 'poormto01': {'AB': 1, 'H': 1, 'playerID': 1},
 'washiul01': {'AB': 2, 'H': 2, 'playerID': 2},
 'huntro01': {'AB': 3, 'H': 3, 'playerID': 3},
 'doziebr01': {'AB': 3, 'H': 3, 'playerID': 3},
 'kurowwh01': {'AB': 5, 'H': 5, 'playerID': 5},
 'andremi01': {'AB': 3, 'H': 3, 'playerID': 3},
 'grababi01': {'AB': 1, 'H': 1, 'playerID': 1},
 'gordosi01': {'AB': 3, 'H': 3, 'playerID': 3},
 'orrda01': {'AB': 2, 'H': 2, 'playerID': 2},
 'wilsowi02': {'AB': 9, 'H': 9, 'playerID': 9},
 'drakede01': {'AB': 1, 'H': 1, 'playerID': 1},
 'davisgl01': {'AB': 4, 'H': 4, 'playerID'

## Aggregate with a dictionary of single functions

In [146]:
agg_funcs = {'AB':count, 'H':add}
inits = {'AB':0}
example = pipe('Batting.csv',
               read_csv,
               select(['playerID', 'H', 'AB']),
               filter(at_least_500_AB),
               take(1000),
               transform({'AB':int, 'H':maybe_int}),
               aggregate(get('playerID'), agg_funcs, init=inits))
               #list)
#{key:get(key, agg_funcs)(get(key, inits), val) for key, val in example[0]}
example

{'abbated01': {'AB': 2, 'H': 318, 'playerID': 'abbated01'},
 'abbeych01': {'AB': 2, 'H': 305, 'playerID': 'abbeych01'},
 'abstebi01': {'AB': 1, 'H': 133, 'playerID': 'abstebi01'},
 'allenbo01': {'AB': 1, 'H': 128, 'playerID': 'allenbo01'},
 'alperwh01': {'AB': 1, 'H': 130, 'playerID': 'alperwh01'},
 'altizda01': {'AB': 1, 'H': 145, 'playerID': 'altizda01'},
 'anderjo01': {'AB': 5, 'H': 808, 'playerID': 'anderjo01'},
 'andreed01': {'AB': 1, 'H': 126, 'playerID': 'andreed01'},
 'ansonca01': {'AB': 6, 'H': 991, 'playerID': 'ansonca01'},
 'babbch01': {'AB': 1, 'H': 138, 'playerID': 'babbch01'},
 'bakerfr01': {'AB': 1, 'H': 165, 'playerID': 'bakerfr01'},
 'barclge01': {'AB': 1, 'H': 163, 'playerID': 'barclge01'},
 'barreji01': {'AB': 5, 'H': 815, 'playerID': 'barreji01'},
 'barrysh01': {'AB': 2, 'H': 308, 'playerID': 'barrysh01'},
 'bassech01': {'AB': 1, 'H': 136, 'playerID': 'bassech01'},
 'batchem01': {'AB': 1, 'H': 143, 'playerID': 'batchem01'},
 'batesjo02': {'AB': 1, 'H': 127, 'playerI

## Using updates to aggregate multiple functions per column

In [117]:
def updates(*fns):
    """a collection of binary function, i.e. reduction updates"""
    maybe_update = lambda f, acc, i: i if acc is None else f(acc, i)
    def update_tuples(t1, t2):
        if t1 is None:
            return t2
        else:
            return tuple(maybe_update(f, i1, i2) 
                         for f, i1, i2 in zip(fns, t1, t2))
    return update_tuples
updates(add, count)((1,2), (100,10))

(101, 3)

In [177]:
agg_funcs = {'AB':updates(add, count), 'H':updates(add, count, max)}
inits = {'AB':(0, 0), 'H':(0, 0, 0)}
example = pipe('Batting.csv',
               read_csv,
               select(['playerID', 'H', 'AB']),
               filter(at_least_500_AB),
               take(1000),
               transform({'AB':int, 'H':maybe_int}),
               transform({'AB':split_item(2), 'H':split_item(3)}),
               aggregate(get('playerID'), agg_funcs, init=inits))
               #list)
#{key:get(key, agg_funcs)(get(key, inits), val) for key, val in example[0]}
example

{'abbated01': {'AB': (1189, 2), 'H': (318, 2, 170), 'playerID': 'abbated01'},
 'abbeych01': {'AB': (1034, 2), 'H': (305, 2, 164), 'playerID': 'abbeych01'},
 'abstebi01': {'AB': (512, 1), 'H': (133, 1, 133), 'playerID': 'abstebi01'},
 'allenbo01': {'AB': (563, 1), 'H': (128, 1, 128), 'playerID': 'allenbo01'},
 'alperwh01': {'AB': (558, 1), 'H': (130, 1, 130), 'playerID': 'alperwh01'},
 'altizda01': {'AB': (540, 1), 'H': (145, 1, 145), 'playerID': 'altizda01'},
 'anderjo01': {'AB': (2791, 5), 'H': (808, 5, 190), 'playerID': 'anderjo01'},
 'andreed01': {'AB': (528, 1), 'H': (126, 1, 126), 'playerID': 'andreed01'},
 'ansonca01': {'AB': (3140, 6), 'H': (991, 6, 187), 'playerID': 'ansonca01'},
 'babbch01': {'AB': (521, 1), 'H': (138, 1, 138), 'playerID': 'babbch01'},
 'bakerfr01': {'AB': (541, 1), 'H': (165, 1, 165), 'playerID': 'bakerfr01'},
 'barclge01': {'AB': (543, 1), 'H': (163, 1, 163), 'playerID': 'barclge01'},
 'barreji01': {'AB': (2737, 5), 'H': (815, 5, 172), 'playerID': 'barreji01

## Allow for a list of automate multiple statistics per cell

In [185]:
agg_funcs = {'AB':[add, count], 'H':[add, count, max]}
inits = {'AB':(None, 0), 'H':(None, 0, None)}
example = pipe('Batting.csv',
               read_csv,
               select(['playerID', 'H', 'AB']),
               filter(at_least_500_AB),
               take(1000),
               transform({'AB':int, 'H':maybe_int}),
               aggregate(get('playerID'), agg_funcs, init=inits))
               #list)
#{key:get(key, agg_funcs)(get(key, inits), val) for key, val in example[0]}
example

{'abbated01': {'AB': (1189, 2), 'H': (318, 2, 170), 'playerID': 'abbated01'},
 'abbeych01': {'AB': (1034, 2), 'H': (305, 2, 164), 'playerID': 'abbeych01'},
 'abstebi01': {'AB': (512, 1), 'H': (133, 1, 133), 'playerID': 'abstebi01'},
 'allenbo01': {'AB': (563, 1), 'H': (128, 1, 128), 'playerID': 'allenbo01'},
 'alperwh01': {'AB': (558, 1), 'H': (130, 1, 130), 'playerID': 'alperwh01'},
 'altizda01': {'AB': (540, 1), 'H': (145, 1, 145), 'playerID': 'altizda01'},
 'anderjo01': {'AB': (2791, 5), 'H': (808, 5, 190), 'playerID': 'anderjo01'},
 'andreed01': {'AB': (528, 1), 'H': (126, 1, 126), 'playerID': 'andreed01'},
 'ansonca01': {'AB': (3140, 6), 'H': (991, 6, 187), 'playerID': 'ansonca01'},
 'babbch01': {'AB': (521, 1), 'H': (138, 1, 138), 'playerID': 'babbch01'},
 'bakerfr01': {'AB': (541, 1), 'H': (165, 1, 165), 'playerID': 'bakerfr01'},
 'barclge01': {'AB': (543, 1), 'H': (163, 1, 163), 'playerID': 'barclge01'},
 'barreji01': {'AB': (2737, 5), 'H': (815, 5, 172), 'playerID': 'barreji01