In [17]:
import dask
import dask.bag as db

In [1]:
def stem(word):
    """Stem a word"""
    return word.lower().rstrip(",.!:;'-\"").lstrip("'\"")

In [14]:
from toolz import compose, frequencies, partial, identity
import toolz
from toolz.curried import map

In [4]:
wordcount = compose(frequencies, map(stem), str.split)

In [5]:
sentence = "The quick brown fox jumped over the lazy dog!"

In [6]:
wordcount(sentence)

{'brown': 1,
 'dog': 1,
 'fox': 1,
 'jumped': 1,
 'lazy': 1,
 'over': 1,
 'quick': 1,
 'the': 2}

In [16]:
identity(stem)

<function __main__.stem>

In [92]:
b = db.read_text('/Users/alexanderkunkel/bin/gutenberg/aleph.gutenberg.org/0/1/1.txt.gz', compression='gzip', errors='replace').str.strip().str.rstrip(",.!:;'-\"").str.lstrip("'\"").str.lower().str.split(' ')

In [93]:
list(b.flatten()) # concatenate nested lists into one long list

['',
 '',
 '',
 'note:',
 '',
 'this',
 'file',
 'combines',
 'the',
 'first',
 'two',
 'project',
 'gutenberg',
 'files,',
 'both',
 'of',
 'which',
 'were',
 'given',
 'the',
 'filenumber',
 '#1.',
 'there',
 'are',
 'several',
 'duplicate',
 'files',
 'here.',
 'there',
 'were',
 'many',
 'updates',
 'over',
 'the',
 'years.',
 '',
 'all',
 'of',
 'the',
 'original',
 'files',
 'are',
 'included',
 'in',
 'the',
 'old"',
 'subdirectory',
 'which',
 'may',
 'be',
 'accessed',
 'under',
 'the',
 '"more',
 'files"',
 'listing',
 'in',
 'the',
 'pg',
 'catalog',
 'of',
 'this',
 'file.',
 'no',
 'changes',
 'have',
 'been',
 'made',
 'in',
 'these',
 'original',
 'etexts',
 '',
 '',
 '',
 '**welcome',
 'to',
 'the',
 'world',
 'of',
 'free',
 'plain',
 'vanilla',
 'electronic',
 'texts**',
 '',
 '**etexts',
 'readable',
 'by',
 'both',
 'humans',
 'and',
 'by',
 'computers,',
 'since',
 '1971**',
 '',
 '*these',
 'etexts',
 'prepared',
 'by',
 'hundreds',
 'of',
 'volunteers',
 'and',
 

Important to note, that within dask, you will often see npartitions.  The npartitions property is the number of dask piecemeal objects (bag, array, dataframes) that compose a single Dask object (so npartitions = 10, would split a dask object into 10 bins for parallel computation

In [48]:
import dask.bag as db

In [49]:
b = db.from_sequence([{'name': 'Alice',   'balance': 100},
                      {'name': 'Bob',     'balance': 200},
                      {'name': 'Charlie', 'balance': 300}])

In [50]:
df = b.to_dataframe()

In [52]:
df.compute()

Unnamed: 0,balance,name
0,100,Alice
0,200,Bob
0,300,Charlie


In [55]:
b = db.from_sequence(range(5))

In [56]:
b.compute()

[0, 1, 2, 3, 4]

In [60]:
b = b.filter(lambda x: x % 2 == 0).map(lambda x: x ** 2)

In [61]:
list(b)

[0, 4, 16]

In [74]:
from operator import add
b = db.from_sequence([1, 2, 3, 4, 5], npartitions=2)

In [75]:
b.accumulate(add).compute()

[1, 3, 6, 10, 15]

In [77]:
b.accumulate(add, initial=-1).compute()

[-1, 0, 2, 5, 9, 14]

In [82]:
b.count().compute() # count is len, count number of elements in a collection

5

In [84]:
b = db.from_sequence([1, 2, 3, 4, 5, 3, 4, 5, 6, 1, 2, 3, 4])
sorted(b.distinct()) # distinct elements of a collection, unordered, without repeats

[1, 2, 3, 4, 5, 6]

In [89]:
def iseven(x):
    return x % 2 == 0
def isodd(x):
    return x % 2 != 0

In [88]:
b = db.from_sequence(range(10))
list(b.filter(iseven)) # filter elements in a collection by a predicate function

[0, 2, 4, 6, 8]

In [90]:
b = db.from_sequence(range(10))
list(b.filter(isodd)) # filter elements in a collection by a predicate function

[1, 3, 5, 7, 9]

`Fold` is like the builtin function reduce except that it works in parallel. Fold takes two binary operator functions, one to reduce each partition of our dataset and another to combine results between partitions  

`binop`: Binary operator to reduce within each partition  
`combine`: Binary operator to combine results from binop

In [95]:
def add(x, y):
    return x + y

In [97]:
def multiply(x, y):
    return x * y

In [107]:
b = db.from_sequence(range(10))
b.fold(add).compute()

45

In [105]:
b = db.from_sequence(range(1, 100))
b.fold(multiply).compute() # if only one function is given, then it is used for both functions binop and combine

933262154439441526816992388562667004907159682643816214685929638952175999932299156089414639761565182862536979208272237582511852109168640000000000000000000000

In [112]:
b.fold(binop=add, combine=add, initial=0).compute()

45

`foldby` : combining reduction and groupby - parallel split-apply-combine
```python
b.foldby(key, binop, init)

is equal to

def reduction(group):
    retrun reduce(binop, group, init)

b.groupby(key).map(lambda (k, v): (k, reduction(v)))
```

In [113]:
b = db.from_sequence(range(10))
iseven = lambda x: x % 2 == 0
add = lambda x, y: x + y
dict(b.foldby(iseven, add))

{False: 25, True: 20}

In [119]:
b = db.from_sequence(sentence.lower().split(' '))

In [121]:
dict(b.frequencies()) # count number of occurences of each distinct element

{'brown': 1,
 'dog!': 1,
 'fox': 1,
 'jumped': 1,
 'lazy': 1,
 'over': 1,
 'quick': 1,
 'the': 2}

`Groupby` grouping collection by key function, where grouper is the function, method is str, npartitions is int, blocksize is int, max_branch is int

In [127]:
b = db.from_sequence(range(10))
iseven = lambda x: x % 2 == 0
dict(b.groupby(iseven))

{False: [9, 7, 5, 3, 1], True: [4, 6, 2, 8, 0]}

`join` joining collection with another collection, other collection must be an iterable and not a bag  
`join(other, on_self, on_other=None)`

In [129]:
people = db.from_sequence(['Alice', 'Bob', 'Charlie'])
fruit = ['Apple', 'Apricot', 'Banana']
list(people.join(fruit, lambda x: x[0]))

[('Apple', 'Alice'), ('Apricot', 'Alice'), ('Banana', 'Bob')]

`map` apply a function elementwise across one or more bags.  Note, all Bag arguments must be partitioned identically.  Takes a callable `func` and `*args`, `**kwargs` which are the extra arguments to pass to `func` after the calling bag instance.  Non-bag args/kwargs are broadcast across all calls

In [130]:
b = db.from_sequence(range(5), npartitions=2)
b2 = db.from_sequence(range(5, 10), npartitions=2)

In [131]:
b.map(lambda x: x + 1).compute()

[1, 2, 3, 4, 5]

In [132]:
b.map(add, b2).compute()

[5, 7, 9, 11, 13]

In [133]:
b.map(add, 1).compute()

[1, 2, 3, 4, 5]

In [134]:
def myadd(x, y=0): # keyword arguments are supported
    return x + y
b.map(myadd, y=b2).compute()

[5, 7, 9, 11, 13]

In [135]:
b.map(myadd, y=1).compute()

[1, 2, 3, 4, 5]

In [136]:
# Both arguments and keyword arguments can also be instances of dask.bag.Item
b.map(myadd, b.max()).compute()

[4, 5, 6, 7, 8]

`map_partitions(func, *args, **kwargs)` apply a function to every partition across one or more bags.

In [137]:
b = db.from_sequence(range(1, 101), npartitions=10)
def div(nums, den=1):
    return [num / den for num in nums]

In [138]:
hi = b.max().compute()

In [139]:
hi

100

In [140]:
b.map_partitions(div, den=hi).take(5)

(0.01, 0.02, 0.03, 0.04, 0.05)

In [141]:
b.map_partitions(div, den=b.max()).take(5)

(0.01, 0.02, 0.03, 0.04, 0.05)

`max(), min(), mean()`

`pluck(key, default='__no__default__')` selects items from all tuples/dictionaries in collection

In [144]:
b = db.from_sequence([{'name': 'Alice', 'credits': [1, 2, 3]},
                   {'name': 'Bob',   'credits': [10, 20]},
                   {'name': 'Alex',  'credits': [2, 3, 4, 10]}])

In [145]:
list(b.pluck('name'))

['Alice', 'Bob', 'Alex']

In [146]:
list(b.pluck('credits'))

[[1, 2, 3], [10, 20], [2, 3, 4, 10]]

In [150]:
list(b.pluck('credits').pluck(1)) # zero indexed, pluck index 1 from plucked credits

[2, 20, 3]

`product(other)` returns the cartesian product between two bags

`random_sample(prob, random_state=None)` returns elements from the bag with probability of prob, where prob is a float between 0 and 1, representing probability that each element is returned, random_state (int or random.Random)

In [155]:
b = db.from_sequence(range(10))

In [157]:
list(b.random_sample(0.25, random_state=42))

[1, 6, 7]

`reduction(prepartation, aggregate, split_every=None, out_type=<class 'dask.bag.core.Item'>, name=None)` will reduce collection with reduction operations, where perpartitation is a reduction function to apply to each partition, aggregate is the reduction function to apply to the results of all partitions, split_every is the side we want to group partitions while performing reduction

In [162]:
b = db.from_sequence(range(10))
b.reduction(sum, sum).compute()

45

`remove(predicate)` removes elements in collection that matches the predicate

In [163]:
iseven = lambda x: x % 2 == 0

In [164]:
b = db.from_sequence(range(10))
list(b.remove(iseven))

[1, 3, 5, 7, 9]

`starmap(func, **kwargs)` apply a function using argument tuples from the given bag

In [165]:
data = [(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]
b = db.from_sequence(data, npartitions=2)

In [166]:
b.starmap(add).compute()

[3, 7, 11, 15, 19]

Apply a function to each argument tuple, with additional keyword arguments:

In [167]:
def myadd(x, y, z=0):
    return x + y + z

In [169]:
b.starmap(myadd, z=10).compute()

[13, 17, 21, 25, 29]

In [170]:
max_second = b.pluck(1).max()
max_second.compute()

10

In [171]:
b.starmap(myadd, z=max_second).compute()

[13, 17, 21, 25, 29]

`std(ddof=0)` Standard Deviation: ddof means Delta Degrees of Freedom. The divisor used in calculations is N - ddof, where N represents the number of elements. By default ddof is zero.

`str` string processing function

In [175]:
b = db.from_sequence(['Alice Smith', 'Bob Jones', 'Charlie Smith', 'Alex Kunk', 'Albert Einstien', 'Joe Smith'])
list(b.str.lower())

['alice smith',
 'bob jones',
 'charlie smith',
 'alex kunk',
 'albert einstien',
 'joe smith']

In [179]:
list(b.str.match('*Smith'))

['Alice Smith', 'Charlie Smith', 'Joe Smith']

In [178]:
list(b.str.match('*A*'))

['Alice Smith', 'Alex Kunk', 'Albert Einstien']

In [174]:
list(b.str.split(' '))

[['Alice', 'Smith'], ['Bob', 'Jones'], ['Charlie', 'Smith']]

`sum(split_every=None)` Sum all elements

`take(k, npartitions=1, compute=True, warn=True)` take the first k elements (k is number of elements to return), compute is whether to compute the result, whether to warn if the number of elements returned is less than requested, default is True.

`topk(k, key=None, split_every=None)` K largest elements in collection, optionally ordered by some key function

In [180]:
b = db.from_sequence([10, 3, 5,7, 11, 4, 8])
list(b.topk(2))

[11, 10]

In [181]:
list(b.topk(2, key=lambda x: x % 2 == 0))

[10, 4]

In [182]:
list(b.topk(2, lambda x: -x))  

[3, 4]

`unzip(n) ` transforming a bag of tuples to n bags of their elements

In [183]:
b = db.from_sequence([(i, i + 1, i + 2) for i in range(10)])

In [190]:
b.compute()

[(0, 1, 2),
 (1, 2, 3),
 (2, 3, 4),
 (3, 4, 5),
 (4, 5, 6),
 (5, 6, 7),
 (6, 7, 8),
 (7, 8, 9),
 (8, 9, 10),
 (9, 10, 11)]

In [184]:
first, second, third = b.unzip(3)

In [186]:
isinstance(first, db.Bag)

True

In [187]:
first.compute()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [188]:
second.compute()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [189]:
third.compute()

[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

`var(ddof=0)` compute variance

In [208]:
b = db.range(100, npartitions=10)

In [None]:
b.var().compute()

In [194]:
b = db.range(5, npartitions=2)
list(b)

[0, 1, 2, 3, 4]