<img src="images/dask_horizontal.svg" align="right" width="30%">

# Bag: Parallel Lists for semi-structured data

Dask-bag excels in processing data that can be represented as a sequence of arbitrary inputs. We'll refer to this as "messy" data, because it can contain complex nested structures, missing fields, mixtures of data types, etc. The *functional* programming style fits very nicely with standard Python iteration, such as can be found in the `itertools` module.

Messy data is often encountered at the beginning of data processing pipelines when large volumes of raw data are first consumed. The initial set of data might be JSON, CSV, XML, or any other format that does not enforce strict structure and datatypes.
For this reason, the initial data massaging and processing is often done with Python `list`s, `dict`s, and `set`s.

These core data structures are optimized for general-purpose storage and processing.  Adding streaming computation with iterators/generator expressions or libraries like `itertools` or [`toolz`](https://toolz.readthedocs.io/en/latest/) let us process large volumes in a small space.  If we combine this with parallel processing then we can churn through a fair amount of data.

Dask.bag is a high level Dask collection to automate common workloads of this form.  In a nutshell

    dask.bag = map, filter, toolz + parallel execution
    
**Related Documentation**

* [Bag documentation](https://docs.dask.org/en/latest/bag.html)
* [Bag screencast](https://youtu.be/-qIiJ1XtSv0)
* [Bag API](https://docs.dask.org/en/latest/bag-api.html)
* [Bag examples](https://examples.dask.org/bag.html)

## Create data

In [1]:
%run prep.py -d accounts

## Setup

Again, we'll use the distributed scheduler. Schedulers will be explained in depth [later](05_distributed.ipynb).

In [2]:
from dask.distributed import Client

client = Client(n_workers=4)

## Creation

You can create a `Bag` from a Python sequence, from files, from data on S3 （S3是Amazon的云存储）, etc.
We demonstrate using `.take()` to show elements of the data. (Doing `.take(1)` results in a tuple with one element)

Note that the data are partitioned into blocks, and there are many items per block. In the first example, the two partitions contain five elements each, and in the following two, each file is partitioned into one or more bytes blocks.

In [3]:
# each element is an integer
import dask.bag as db
b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], npartitions=2)
b.take(3)

(1, 2, 3)

In [4]:
# each element is a text file, where each line is a JSON object
# note that the compression is handled automatically
import os
b = db.read_text(os.path.join('data', 'accounts.*.json.gz'))
b.take(1)

('{"id": 0, "name": "Michael", "transactions": [{"transaction-id": 50, "amount": 497}, {"transaction-id": 99, "amount": 624}, {"transaction-id": 351, "amount": 610}, {"transaction-id": 475, "amount": 547}, {"transaction-id": 661, "amount": 627}, {"transaction-id": 731, "amount": 590}, {"transaction-id": 806, "amount": 610}, {"transaction-id": 906, "amount": 655}, {"transaction-id": 983, "amount": 630}, {"transaction-id": 993, "amount": 632}, {"transaction-id": 1033, "amount": 545}, {"transaction-id": 1084, "amount": 603}, {"transaction-id": 1409, "amount": 586}, {"transaction-id": 1413, "amount": 541}, {"transaction-id": 1667, "amount": 589}, {"transaction-id": 1793, "amount": 599}, {"transaction-id": 1799, "amount": 543}, {"transaction-id": 1838, "amount": 445}, {"transaction-id": 2110, "amount": 578}, {"transaction-id": 2275, "amount": 614}, {"transaction-id": 2700, "amount": 554}, {"transaction-id": 2759, "amount": 494}, {"transaction-id": 2841, "amount": 554}, {"transaction-id": 28

In [5]:
# Edit sources.py to configure source locations
import sources
sources.bag_url

's3://dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.csv'

In [6]:
# Requires `s3fs` library
# each partition is a remote CSV text file
# 这个文件有1.8G，内存要有一定量。另外，国内访问亚马逊云也有点困难，需要科学上网，否则跑不通
b = db.read_text(sources.bag_url,
                 storage_options={'anon': True})
b.take(1)

('VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount\n',)

## Manipulation

`Bag` objects hold the standard functional API found in projects like the Python standard library, `toolz`, or `pyspark`, including `map`, `filter`, `groupby`, etc..

Operations on `Bag` objects create new bags.  Call the `.compute()` method to trigger execution, as we saw for `Delayed` objects.  

In [8]:
def is_even(n):
    return n % 2 == 0

b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
c = b.filter(is_even).map(lambda x: x ** 2)
c

dask.bag<lambda, npartitions=10>

In [9]:
# blocking form: wait for completion (which is very fast in this case)
c.compute()

[4, 16, 36, 64, 100]

### Example: Accounts JSON data

We've created a fake dataset of gzipped JSON data in your data directory.  This is like the example used in the `DataFrame` example we will see later, except that it has bundled up all of the entires for each individual `id` into a single record.  This is similar to data that you might collect off of a document store database or a web API.

Each line is a JSON encoded dictionary with the following keys

*  id: Unique identifier of the customer
*  name: Name of the customer
*  transactions: List of `transaction-id`, `amount` pairs, one for each transaction for the customer in that file

In [11]:
filename = os.path.join('data', 'accounts.*.json.gz')
lines = db.read_text(filename)
lines.take(3)

('{"id": 0, "name": "Michael", "transactions": [{"transaction-id": 50, "amount": 497}, {"transaction-id": 99, "amount": 624}, {"transaction-id": 351, "amount": 610}, {"transaction-id": 475, "amount": 547}, {"transaction-id": 661, "amount": 627}, {"transaction-id": 731, "amount": 590}, {"transaction-id": 806, "amount": 610}, {"transaction-id": 906, "amount": 655}, {"transaction-id": 983, "amount": 630}, {"transaction-id": 993, "amount": 632}, {"transaction-id": 1033, "amount": 545}, {"transaction-id": 1084, "amount": 603}, {"transaction-id": 1409, "amount": 586}, {"transaction-id": 1413, "amount": 541}, {"transaction-id": 1667, "amount": 589}, {"transaction-id": 1793, "amount": 599}, {"transaction-id": 1799, "amount": 543}, {"transaction-id": 1838, "amount": 445}, {"transaction-id": 2110, "amount": 578}, {"transaction-id": 2275, "amount": 614}, {"transaction-id": 2700, "amount": 554}, {"transaction-id": 2759, "amount": 494}, {"transaction-id": 2841, "amount": 554}, {"transaction-id": 28

Our data comes out of the file as lines of text. Notice that file decompression happened automatically. We can make this data look more reasonable by mapping the `json.loads` function onto our bag.

In [12]:
import json
js = lines.map(json.loads)
# take: inspect first few elements
js.take(3)

({'id': 0,
  'name': 'Michael',
  'transactions': [{'transaction-id': 50, 'amount': 497},
   {'transaction-id': 99, 'amount': 624},
   {'transaction-id': 351, 'amount': 610},
   {'transaction-id': 475, 'amount': 547},
   {'transaction-id': 661, 'amount': 627},
   {'transaction-id': 731, 'amount': 590},
   {'transaction-id': 806, 'amount': 610},
   {'transaction-id': 906, 'amount': 655},
   {'transaction-id': 983, 'amount': 630},
   {'transaction-id': 993, 'amount': 632},
   {'transaction-id': 1033, 'amount': 545},
   {'transaction-id': 1084, 'amount': 603},
   {'transaction-id': 1409, 'amount': 586},
   {'transaction-id': 1413, 'amount': 541},
   {'transaction-id': 1667, 'amount': 589},
   {'transaction-id': 1793, 'amount': 599},
   {'transaction-id': 1799, 'amount': 543},
   {'transaction-id': 1838, 'amount': 445},
   {'transaction-id': 2110, 'amount': 578},
   {'transaction-id': 2275, 'amount': 614},
   {'transaction-id': 2700, 'amount': 554},
   {'transaction-id': 2759, 'amount': 49

### Basic Queries

Once we parse our JSON data into proper Python objects (`dict`s, `list`s, etc.) we can perform more interesting queries by creating small Python functions to run on our data.

In [13]:
# filter: keep only some elements of the sequence
js.filter(lambda record: record['name'] == 'Alice').take(5)

({'id': 9,
  'name': 'Alice',
  'transactions': [{'transaction-id': 433, 'amount': 278},
   {'transaction-id': 606, 'amount': 284},
   {'transaction-id': 1576, 'amount': 300},
   {'transaction-id': 1599, 'amount': 290},
   {'transaction-id': 1763, 'amount': 270},
   {'transaction-id': 1883, 'amount': 297},
   {'transaction-id': 2135, 'amount': 252},
   {'transaction-id': 2149, 'amount': 233},
   {'transaction-id': 2534, 'amount': 291},
   {'transaction-id': 2542, 'amount': 311},
   {'transaction-id': 2544, 'amount': 298},
   {'transaction-id': 2552, 'amount': 316},
   {'transaction-id': 2570, 'amount': 257},
   {'transaction-id': 3002, 'amount': 286},
   {'transaction-id': 3068, 'amount': 245},
   {'transaction-id': 4059, 'amount': 283},
   {'transaction-id': 4886, 'amount': 262},
   {'transaction-id': 5916, 'amount': 269},
   {'transaction-id': 5934, 'amount': 284},
   {'transaction-id': 6025, 'amount': 299},
   {'transaction-id': 6473, 'amount': 240},
   {'transaction-id': 6820, 'amo

In [17]:
def count_transactions(d):
    return {'name': d['name'], 'count': len(d['transactions'])}

# map: apply a function to each element
(js.filter(lambda record: record['name'] == 'Alice')
   .map(count_transactions)
   .take(5))

({'name': 'Alice', 'count': 252},
 {'name': 'Alice', 'count': 220},
 {'name': 'Alice', 'count': 468},
 {'name': 'Alice', 'count': 15},
 {'name': 'Alice', 'count': 67})

In [18]:
# pluck: select a field, as from a dictionary, element[field]
(js.filter(lambda record: record['name'] == 'Alice')
   .map(count_transactions)
   .pluck('count')
   .take(5))

(252, 220, 468, 15, 67)

In [19]:
# Average number of transactions for all of the Alice entries
(js.filter(lambda record: record['name'] == 'Alice')
   .map(count_transactions)
   .pluck('count')
   .mean()
   .compute())

152.3435294117647

### Use `flatten` to de-nest

In the example below we see the use of `.flatten()` to flatten results.  We compute the average amount for all transactions for all Alices.

In [20]:
js.filter(lambda record: record['name'] == 'Alice').pluck('transactions').take(3)

([{'transaction-id': 433, 'amount': 278},
  {'transaction-id': 606, 'amount': 284},
  {'transaction-id': 1576, 'amount': 300},
  {'transaction-id': 1599, 'amount': 290},
  {'transaction-id': 1763, 'amount': 270},
  {'transaction-id': 1883, 'amount': 297},
  {'transaction-id': 2135, 'amount': 252},
  {'transaction-id': 2149, 'amount': 233},
  {'transaction-id': 2534, 'amount': 291},
  {'transaction-id': 2542, 'amount': 311},
  {'transaction-id': 2544, 'amount': 298},
  {'transaction-id': 2552, 'amount': 316},
  {'transaction-id': 2570, 'amount': 257},
  {'transaction-id': 3002, 'amount': 286},
  {'transaction-id': 3068, 'amount': 245},
  {'transaction-id': 4059, 'amount': 283},
  {'transaction-id': 4886, 'amount': 262},
  {'transaction-id': 5916, 'amount': 269},
  {'transaction-id': 5934, 'amount': 284},
  {'transaction-id': 6025, 'amount': 299},
  {'transaction-id': 6473, 'amount': 240},
  {'transaction-id': 6820, 'amount': 279},
  {'transaction-id': 7422, 'amount': 264},
  {'transacti

In [21]:
(js.filter(lambda record: record['name'] == 'Alice')
   .pluck('transactions')
   .flatten()
   .take(3))

({'transaction-id': 433, 'amount': 278},
 {'transaction-id': 606, 'amount': 284},
 {'transaction-id': 1576, 'amount': 300})

In [22]:
(js.filter(lambda record: record['name'] == 'Alice')
   .pluck('transactions')
   .flatten()
   .pluck('amount')
   .take(3))

(278, 284, 300)

In [23]:
(js.filter(lambda record: record['name'] == 'Alice')
   .pluck('transactions')
   .flatten()
   .pluck('amount')
   .mean()
   .compute())

300.0049501127483

### Groupby and Foldby

Often we want to group data by some function or key.  We can do this either with the `.groupby` method, which is straightforward but forces a full shuffle of the data (expensive) or with the harder-to-use but faster `.foldby` method, which does a streaming combined groupby and reduction.

*  `groupby`:  Shuffles data so that all items with the same key are in the same key-value pair
*  `foldby`:  Walks through the data accumulating a result per key

*Note: the full groupby is particularly bad. In actual workloads you would do well to use `foldby` or switch to `DataFrame`s if possible.*

### `groupby`

Groupby collects items in your collection so that all items with the same value under some function are collected together into a key-value pair.

In [24]:
b = db.from_sequence(['Alice', 'Bob', 'Charlie', 'Dan', 'Edith', 'Frank'])
b.groupby(len).compute()  # names grouped by length

[(7, ['Charlie']), (3, ['Bob', 'Dan']), (5, ['Alice', 'Edith', 'Frank'])]

In [25]:
b = db.from_sequence(list(range(10)))
b.groupby(lambda x: x % 2).compute()

[(0, [0, 2, 4, 6, 8]), (1, [1, 3, 5, 7, 9])]

In [27]:
# Apply a function using argument tuples from the given bag.
b.groupby(lambda x: x % 2).starmap(lambda k, v: (k, max(v))).compute()

[(0, 8), (1, 9)]

### `foldby`

Foldby can be quite odd at first.  It is similar to the following functions from other libraries:

*  [`toolz.reduceby`](http://toolz.readthedocs.io/en/latest/streaming-analytics.html#streaming-split-apply-combine)
*  [`pyspark.RDD.combineByKey`](http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/)

所以这里稍微补充下这俩的信息。[toolz](https://github.com/pytoolz/toolz)是一个作用于 iterators，functions和dictionaries 的函数工具集，函数都是函数式编程的规则。它们互相结合能完成很复杂的运算。

toolz 函数能分析大数据集，支持如 selection，grouping，reduction和joining等常见的分析模式。这些函数和SQL或Pandas中的函数类似。

In [31]:
accounts = [(1, 'Alice', 100, 'F'),  # id, name, balance, gender
             (2, 'Bob', 200, 'M'),
             (3, 'Charlie', 150, 'M'),
             (4, 'Dennis', 50, 'M'),
             (5, 'Edith', 300, 'F')]

比如类似于一个SQL语句的功能可以这样实现

```SQL
SELECT name, balance
FROM accounts
WHERE balance > 150;
```

In [33]:
from toolz.curried import pipe, map, filter, get
pipe(accounts, filter(lambda acc: acc[2] > 150),
                map(get([1, 2])),
                list)

[('Bob', 200), ('Edith', 300)]

下面回到reduceby函数。先提一个概念，split-apply-combine 操作，参考[pandas下的文档](https://pandas.pydata.org/pandas-docs/stable/user_guide/groupby.html)，它就是三个操作（分-算-合）的结合：

- Splitting the data into groups based on some criteria.
- Applying a function to each group independently.
- Combining the results into a data structure.

toolz中Split-apply-combine 分为两个概念：

1. Split the dataset into groups by some property
2. Reduce each of the groups with some synopsis function

这个工作流在toolz中有两种solution：

1. a simple in-memory solution
2. a more sophisticated streaming solution.

第一种，In Memory Split-Apply-Combine，主要是函数 groupby，来 split, and valmap to apply/combine，它是将完整数据集放入内存作一个dictionary。这里不多说，直接看第二种；

第二种是Streaming Split-Apply-Combine，上一种显然在数据集较大的时候是有约束的，是非流式的，第二种就是

When using `foldby` you provide 

1.  A key function on which to group elements
2.  A binary operator such as you would pass to `reduce` that you use to perform reduction per each group
3.  A combine binary operator that can combine the results of two `reduce` calls on different parts of your dataset.

Your reduction must be associative.  It will happen in parallel in each of the partitions of your dataset.  Then all of these intermediate results will be combined by the `combine` binary operator.

In [28]:
is_even = lambda x: x % 2
# Combined reduction and groupby.
# b.foldby(key, binop, init) 等价于
# def reduction(group):
# ...     return reduce(binop, group, init)  
# b.groupby(key).map(lambda (k, v): (k, reduction(v)))

# binop: Binary operator to reduce within each partition
# combine: Binary operator to combine results from binop
b.foldby(is_even, binop=max, combine=max).compute()

[(0, 8), (1, 9)]

### Example with account data

We find the number of people with the same name.

In [29]:
%%time
# Warning, this one takes a while...
result = js.groupby(lambda item: item['name']).starmap(lambda k, v: (k, len(v))).compute()
print(sorted(result))

[('Alice', 204), ('Alice', 204), ('Alice', 221), ('Alice', 221), ('Bob', 324), ('Bob', 324), ('Bob', 351), ('Bob', 351), ('Charlie', 216), ('Charlie', 216), ('Charlie', 234), ('Charlie', 234), ('Dan', 288), ('Dan', 288), ('Dan', 312), ('Dan', 312), ('Edith', 285), ('Edith', 285), ('Edith', 310), ('Edith', 312), ('Frank', 287), ('Frank', 287), ('Frank', 311), ('Frank', 312), ('George', 155), ('George', 156), ('George', 168), ('George', 169), ('Hannah', 264), ('Hannah', 264), ('Hannah', 286), ('Hannah', 286), ('Ingrid', 216), ('Ingrid', 216), ('Ingrid', 234), ('Ingrid', 234), ('Jerry', 144), ('Jerry', 144), ('Jerry', 156), ('Jerry', 156), ('Kevin', 216), ('Kevin', 216), ('Kevin', 234), ('Kevin', 234), ('Laura', 225), ('Laura', 225), ('Laura', 243), ('Laura', 243), ('Michael', 322), ('Michael', 324), ('Michael', 347), ('Michael', 350), ('Norbert', 215), ('Norbert', 216), ('Norbert', 226), ('Norbert', 232), ('Oliver', 132), ('Oliver', 132), ('Oliver', 142), ('Oliver', 142), ('Patricia', 15

In [30]:
%%time
# This one is comparatively fast and produces the same result.
from operator import add
def incr(tot, _):
    return tot+1

result = js.foldby(key='name', 
                   binop=incr, 
                   initial=0, 
                   combine=add, 
                   combine_initial=0).compute()
print(sorted(result))

[('Alice', 850), ('Bob', 1350), ('Charlie', 900), ('Dan', 1200), ('Edith', 1192), ('Frank', 1197), ('George', 648), ('Hannah', 1100), ('Ingrid', 900), ('Jerry', 600), ('Kevin', 900), ('Laura', 936), ('Michael', 1343), ('Norbert', 889), ('Oliver', 548), ('Patricia', 600), ('Quinn', 950), ('Ray', 850), ('Sarah', 900), ('Tim', 1298), ('Ursula', 850), ('Victor', 797), ('Wendy', 1400), ('Xavier', 900), ('Yvonne', 979), ('Zelda', 850)]
Wall time: 2.47 s


### Exercise: compute total amount per name

We want to groupby (or foldby) the `name` key, then add up the all of the amounts for each name.

Steps

1.  Create a small function that, given a dictionary like 

        {'name': 'Alice', 'transactions': [{'amount': 1, 'id': 123}, {'amount': 2, 'id': 456}]}
        
    produces the sum of the amounts, e.g. `3`
    
2.  Slightly change the binary operator of the `foldby` example above so that the binary operator doesn't count the number of entries, but instead accumulates the sum of the amounts.

In [None]:
# Your code here...

## DataFrames

For the same reasons that Pandas is often faster than pure Python, `dask.dataframe` can be faster than `dask.bag`.  We will work more with DataFrames later, but from for the bag point of view, they are frequently the end-point of the "messy" part of data ingestion—once the data can be made into a data-frame, then complex split-apply-combine logic will become much more straight-forward and efficient.

You can transform a bag with a simple tuple or flat dictionary structure into a `dask.dataframe` with the `to_dataframe` method.

In [None]:
df1 = js.to_dataframe()
df1.head()

This now looks like a well-defined DataFrame, and we can apply Pandas-like computations to it efficiently.

Using a Dask DataFrame, how long does it take to do our prior computation of numbers of people with the same name?  It turns out that `dask.dataframe.groupby()` beats `dask.bag.groupby()` more than an order of magnitude; but it still cannot match `dask.bag.foldby()` for this case.

In [None]:
%time df1.groupby('name').id.count().compute().head()

### Denormalization

This DataFrame format is less-than-optimal because the `transactions` column is filled with nested data so Pandas has to revert to `object` dtype, which is quite slow in Pandas.  Ideally we want to transform to a dataframe only after we have flattened our data so that each record is a single `int`, `string`, `float`, etc..

In [None]:
def denormalize(record):
    # returns a list for every nested item, each transaction of each person
    return [{'id': record['id'], 
             'name': record['name'], 
             'amount': transaction['amount'], 
             'transaction-id': transaction['transaction-id']}
            for transaction in record['transactions']]

transactions = js.map(denormalize).flatten()
transactions.take(3)

In [None]:
df = transactions.to_dataframe()
df.head()

In [None]:
%%time
# number of transactions per name
# note that the time here includes the data load and ingestion
df.groupby('name')['transaction-id'].count().compute()

## Limitations

Bags provide very general computation (any Python function.)  This generality
comes at cost.  Bags have the following known limitations

1.  Bag operations tend to be slower than array/dataframe computations in the
    same way that Python tends to be slower than NumPy/Pandas
2.  ``Bag.groupby`` is slow.  You should try to use ``Bag.foldby`` if possible.
    Using ``Bag.foldby`` requires more thought. Even better, consider creating
    a normalised dataframe.

## Learn More

* [Bag documentation](https://docs.dask.org/en/latest/bag.html)
* [Bag screencast](https://youtu.be/-qIiJ1XtSv0)
* [Bag API](https://docs.dask.org/en/latest/bag-api.html)
* [Bag examples](https://examples.dask.org/bag.html)

## Shutdown

In [None]:
client.shutdown()