<img src="images/dask_horizontal.svg" align="right" width="30%">

# Bag：半结构化数据的并行列表

Dask-bag擅长处理可以表示为任意输入序列的数据。 我们将其称为“混乱”数据，因为它可能包含复杂的嵌套结构，缺少的字段，数据类型的混合等。 *functional*编程风格非常适合标准Python迭代，例如可以在`itertools`模块中找到。

当首先使用大量原始数据时，在数据处理管道的开头经常会遇到混乱数据。 初始数据集可以是JSON，CSV，XML或任何其他不强制使用严格结构和数据类型的格式。
因此，通常使用Python`list`，`dict`和`set`进行初始数据的批量处理。

这些核心数据结构针对通用存储和处理进行了优化。 通过迭代器/生成器表达式或诸如`itertools`或[`toolz`](https://toolz.readthedocs.io/en/latest/)之类的库添加流计算，可以使我们在较小的空间中处理大量数据。 如果我们将其与并行处理相结合，那么我们就可以处理大量数据。

Dask.bag是高级Dask集合，可自动执行此表单的常见工作负载。  简而言之

    dask.bag = map, filter, toolz + parallel execution
    
**相关文档**

* [Bag documentation](https://docs.dask.org/en/latest/bag.html)
* [Bag screencast](https://youtu.be/-qIiJ1XtSv0)
* [Bag API](https://docs.dask.org/en/latest/bag-api.html)
* [Bag examples](https://examples.dask.org/bag.html)

## 创建数据

In [1]:
%run prep.py -d accounts

Created CSV acccouts in 3.11s
Created JSON acccouts in 29.05s


## Setup

同样，我们将使用分布式调度程序。 调度程序将在[以后](05_distributed.ipynb)更详细地解释。

In [2]:
from dask.distributed import Client

client = Client(n_workers=4)

## Creation

您可以由Python序列，文件，S3上的数据等来创建`bag`。   

我们演示了使用`.take()`来显示数据元素。 （执行`.take(1)`会显示包含一个元素的一个元组）  

注意，数据被划分为块，每个块有很多项。 在第一个示例中，两个分区每个包含五个元素，在接下来的两个示例中，每个文件被划分为一个或多个字节块。

In [3]:
# 每个元素都是一个整数
import dask.bag as db
b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], npartitions=2)
b.take(3)

(1, 2, 3)

In [5]:
# 每个元素都是一个文本文件，其中每一行都是一个JSON对象
# 注意压缩是自动处理的
import os
b = db.read_text(os.path.join('data', 'accounts.*.json.gz'))
b.take(1)

('{"id": 0, "name": "Jerry", "transactions": [{"transaction-id": 576, "amount": -679}, {"transaction-id": 854, "amount": -841}, {"transaction-id": 1193, "amount": -809}, {"transaction-id": 1506, "amount": -801}, {"transaction-id": 1572, "amount": -1008}, {"transaction-id": 2081, "amount": -923}, {"transaction-id": 3496, "amount": -750}, {"transaction-id": 4068, "amount": -793}, {"transaction-id": 4335, "amount": -849}, {"transaction-id": 4519, "amount": -819}, {"transaction-id": 4768, "amount": -682}, {"transaction-id": 5055, "amount": -548}, {"transaction-id": 5157, "amount": -607}, {"transaction-id": 5797, "amount": -896}, {"transaction-id": 6144, "amount": -901}, {"transaction-id": 6830, "amount": -930}, {"transaction-id": 8151, "amount": -924}, {"transaction-id": 9486, "amount": -778}, {"transaction-id": 10559, "amount": -1071}, {"transaction-id": 11374, "amount": -648}, {"transaction-id": 11770, "amount": -876}, {"transaction-id": 13495, "amount": -859}, {"transaction-id": 13871, 

In [6]:
# 编辑sources.py以配置源位置
import sources
sources.bag_url

's3://dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.csv'

In [8]:
# Requires `s3fs` library
# each partition is a remote CSV text file
b = db.read_text(sources.bag_url,
                 storage_options={'anon': True})
b.take(1)

('VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount\n',)

## 操作

`Bag`对象保存在Python标准库，`toolz`或`pyspark`等项目中找到的标准功能性API，包括`map`, `filter`, `groupby`等。  

对`Bag`对象的操作会创建新的包。 调用`.compute()`方法以触发执行，正如我们对`Delayed`对象所见。

In [9]:
def is_even(n):
    return n % 2 == 0

b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
c = b.filter(is_even).map(lambda x: x ** 2)
c

dask.bag<lambda, npartitions=10>

In [11]:
# 阻止形式：等待完成（这种情况下非常快）
c.compute()

[4, 16, 36, 64, 100]

### 示例：认定JSON数据

我们在您的数据目录中创建了一个gzip JSON数据的伪数据集。 就像稍后将在`DataFrame`示例中使用的示例一样，不同之处在于它已将每个`id`的全部捆绑为一条记录。 这类似于您可能从文档存储数据库或Web API收集的数据。

每行都是JSON编码的字典，其中包含以下键

*  id: 客户的唯一标识符
*  name: 客户名称
*  transactions: `transaction-id`, `amount`对的列表，该文件中客户的每笔交易一对

In [12]:
filename = os.path.join('data', 'accounts.*.json.gz')
lines = db.read_text(filename)
lines.take(3)

('{"id": 0, "name": "Jerry", "transactions": [{"transaction-id": 576, "amount": -679}, {"transaction-id": 854, "amount": -841}, {"transaction-id": 1193, "amount": -809}, {"transaction-id": 1506, "amount": -801}, {"transaction-id": 1572, "amount": -1008}, {"transaction-id": 2081, "amount": -923}, {"transaction-id": 3496, "amount": -750}, {"transaction-id": 4068, "amount": -793}, {"transaction-id": 4335, "amount": -849}, {"transaction-id": 4519, "amount": -819}, {"transaction-id": 4768, "amount": -682}, {"transaction-id": 5055, "amount": -548}, {"transaction-id": 5157, "amount": -607}, {"transaction-id": 5797, "amount": -896}, {"transaction-id": 6144, "amount": -901}, {"transaction-id": 6830, "amount": -930}, {"transaction-id": 8151, "amount": -924}, {"transaction-id": 9486, "amount": -778}, {"transaction-id": 10559, "amount": -1071}, {"transaction-id": 11374, "amount": -648}, {"transaction-id": 11770, "amount": -876}, {"transaction-id": 13495, "amount": -859}, {"transaction-id": 13871, 

我们的数据以文本行的形式从文件中发出。 请注意，文件解压缩是自动发生的。 通过将json.loads函数映射到我们的包中，我们可以使这些数据看起来更合理。

In [13]:
import json
js = lines.map(json.loads)
# take: inspect first few elements
js.take(3)

({'id': 0,
  'name': 'Jerry',
  'transactions': [{'transaction-id': 576, 'amount': -679},
   {'transaction-id': 854, 'amount': -841},
   {'transaction-id': 1193, 'amount': -809},
   {'transaction-id': 1506, 'amount': -801},
   {'transaction-id': 1572, 'amount': -1008},
   {'transaction-id': 2081, 'amount': -923},
   {'transaction-id': 3496, 'amount': -750},
   {'transaction-id': 4068, 'amount': -793},
   {'transaction-id': 4335, 'amount': -849},
   {'transaction-id': 4519, 'amount': -819},
   {'transaction-id': 4768, 'amount': -682},
   {'transaction-id': 5055, 'amount': -548},
   {'transaction-id': 5157, 'amount': -607},
   {'transaction-id': 5797, 'amount': -896},
   {'transaction-id': 6144, 'amount': -901},
   {'transaction-id': 6830, 'amount': -930},
   {'transaction-id': 8151, 'amount': -924},
   {'transaction-id': 9486, 'amount': -778},
   {'transaction-id': 10559, 'amount': -1071},
   {'transaction-id': 11374, 'amount': -648},
   {'transaction-id': 11770, 'amount': -876},
   {'t

### 基本查询

一旦我们将JSON数据解析为适当的Python对象（`dict`，`list`等），我们就可以通过创建小的Python函数在数据上运行来执行更有趣的查询。

In [14]:
# filter: keep only some elements of the sequence
js.filter(lambda record: record['name'] == 'Alice').take(5)

({'id': 4,
  'name': 'Alice',
  'transactions': [{'transaction-id': 78, 'amount': 1326},
   {'transaction-id': 856, 'amount': 1279},
   {'transaction-id': 1981, 'amount': 1337},
   {'transaction-id': 2410, 'amount': 1205},
   {'transaction-id': 3213, 'amount': 1173},
   {'transaction-id': 3600, 'amount': 1267},
   {'transaction-id': 4376, 'amount': 1254},
   {'transaction-id': 4643, 'amount': 1343},
   {'transaction-id': 4785, 'amount': 1306},
   {'transaction-id': 4888, 'amount': 1281},
   {'transaction-id': 5662, 'amount': 1242},
   {'transaction-id': 6028, 'amount': 1479},
   {'transaction-id': 6110, 'amount': 1295},
   {'transaction-id': 6932, 'amount': 1322},
   {'transaction-id': 7090, 'amount': 1311},
   {'transaction-id': 8596, 'amount': 1364},
   {'transaction-id': 9952, 'amount': 1295},
   {'transaction-id': 10740, 'amount': 1352},
   {'transaction-id': 10937, 'amount': 1210},
   {'transaction-id': 11100, 'amount': 1313},
   {'transaction-id': 12275, 'amount': 1328},
   {'tra

In [15]:
def count_transactions(d):
    return {'name': d['name'], 'count': len(d['transactions'])}

# map: apply a function to each element
(js.filter(lambda record: record['name'] == 'Alice')
   .map(count_transactions)
   .take(5))

({'name': 'Alice', 'count': 155},
 {'name': 'Alice', 'count': 20},
 {'name': 'Alice', 'count': 18},
 {'name': 'Alice', 'count': 275},
 {'name': 'Alice', 'count': 52})

In [16]:
# pluck: select a field, as from a dictionary, element[field]
(js.filter(lambda record: record['name'] == 'Alice')
   .map(count_transactions)
   .pluck('count')
   .take(5))

(155, 20, 18, 275, 52)

In [17]:
# Average number of transactions for all of the Alice entries
(js.filter(lambda record: record['name'] == 'Alice')
   .map(count_transactions)
   .pluck('count')
   .mean()
   .compute())

121.89916666666667

### Use `flatten` to de-nest

In the example below we see the use of `.flatten()` to flatten results.  We compute the average amount for all transactions for all Alices.

In [None]:
js.filter(lambda record: record['name'] == 'Alice').pluck('transactions').take(3)

In [None]:
(js.filter(lambda record: record['name'] == 'Alice')
   .pluck('transactions')
   .flatten()
   .take(3))

In [None]:
(js.filter(lambda record: record['name'] == 'Alice')
   .pluck('transactions')
   .flatten()
   .pluck('amount')
   .take(3))

In [None]:
(js.filter(lambda record: record['name'] == 'Alice')
   .pluck('transactions')
   .flatten()
   .pluck('amount')
   .mean()
   .compute())

### Groupby and Foldby

Often we want to group data by some function or key.  We can do this either with the `.groupby` method, which is straightforward but forces a full shuffle of the data (expensive) or with the harder-to-use but faster `.foldby` method, which does a streaming combined groupby and reduction.

*  `groupby`:  Shuffles data so that all items with the same key are in the same key-value pair
*  `foldby`:  Walks through the data accumulating a result per key

*Note: the full groupby is particularly bad. In actual workloads you would do well to use `foldby` or switch to `DataFrame`s if possible.*

### `groupby`

Groupby collects items in your collection so that all items with the same value under some function are collected together into a key-value pair.

In [None]:
b = db.from_sequence(['Alice', 'Bob', 'Charlie', 'Dan', 'Edith', 'Frank'])
b.groupby(len).compute()  # names grouped by length

In [None]:
b = db.from_sequence(list(range(10)))
b.groupby(lambda x: x % 2).compute()

In [None]:
b.groupby(lambda x: x % 2).starmap(lambda k, v: (k, max(v))).compute()

### `foldby`

Foldby can be quite odd at first.  It is similar to the following functions from other libraries:

*  [`toolz.reduceby`](http://toolz.readthedocs.io/en/latest/streaming-analytics.html#streaming-split-apply-combine)
*  [`pyspark.RDD.combineByKey`](http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/)

When using `foldby` you provide 

1.  A key function on which to group elements
2.  A binary operator such as you would pass to `reduce` that you use to perform reduction per each group
3.  A combine binary operator that can combine the results of two `reduce` calls on different parts of your dataset.

Your reduction must be associative.  It will happen in parallel in each of the partitions of your dataset.  Then all of these intermediate results will be combined by the `combine` binary operator.

In [None]:
is_even = lambda x: x % 2
b.foldby(is_even, binop=max, combine=max).compute()

### Example with account data

We find the number of people with the same name.

In [None]:
%%time
# Warning, this one takes a while...
result = js.groupby(lambda item: item['name']).starmap(lambda k, v: (k, len(v))).compute()
print(sorted(result))

In [None]:
%%time
# This one is comparatively fast and produces the same result.
from operator import add
def incr(tot, _):
    return tot+1

result = js.foldby(key='name', 
                   binop=incr, 
                   initial=0, 
                   combine=add, 
                   combine_initial=0).compute()
print(sorted(result))

### Exercise: compute total amount per name

We want to groupby (or foldby) the `name` key, then add up the all of the amounts for each name.

Steps

1.  Create a small function that, given a dictionary like 

        {'name': 'Alice', 'transactions': [{'amount': 1, 'id': 123}, {'amount': 2, 'id': 456}]}
        
    produces the sum of the amounts, e.g. `3`
    
2.  Slightly change the binary operator of the `foldby` example above so that the binary operator doesn't count the number of entries, but instead accumulates the sum of the amounts.

In [None]:
# Your code here...

## DataFrames

For the same reasons that Pandas is often faster than pure Python, `dask.dataframe` can be faster than `dask.bag`.  We will work more with DataFrames later, but from for the bag point of view, they are frequently the end-point of the "messy" part of data ingestion—once the data can be made into a data-frame, then complex split-apply-combine logic will become much more straight-forward and efficient.

You can transform a bag with a simple tuple or flat dictionary structure into a `dask.dataframe` with the `to_dataframe` method.

In [None]:
df1 = js.to_dataframe()
df1.head()

This now looks like a well-defined DataFrame, and we can apply Pandas-like computations to it efficiently.

Using a Dask DataFrame, how long does it take to do our prior computation of numbers of people with the same name?  It turns out that `dask.dataframe.groupby()` beats `dask.bag.groupby()` more than an order of magnitude; but it still cannot match `dask.bag.foldby()` for this case.

In [None]:
%time df1.groupby('name').id.count().compute().head()

### Denormalization

This DataFrame format is less-than-optimal because the `transactions` column is filled with nested data so Pandas has to revert to `object` dtype, which is quite slow in Pandas.  Ideally we want to transform to a dataframe only after we have flattened our data so that each record is a single `int`, `string`, `float`, etc..

In [None]:
def denormalize(record):
    # returns a list for every nested item, each transaction of each person
    return [{'id': record['id'], 
             'name': record['name'], 
             'amount': transaction['amount'], 
             'transaction-id': transaction['transaction-id']}
            for transaction in record['transactions']]

transactions = js.map(denormalize).flatten()
transactions.take(3)

In [None]:
df = transactions.to_dataframe()
df.head()

In [None]:
%%time
# number of transactions per name
# note that the time here includes the data load and ingestion
df.groupby('name')['transaction-id'].count().compute()

## Limitations

Bags provide very general computation (any Python function.)  This generality
comes at cost.  Bags have the following known limitations

1.  Bag operations tend to be slower than array/dataframe computations in the
    same way that Python tends to be slower than NumPy/Pandas
2.  ``Bag.groupby`` is slow.  You should try to use ``Bag.foldby`` if possible.
    Using ``Bag.foldby`` requires more thought. Even better, consider creating
    a normalised dataframe.

## Learn More

* [Bag documentation](https://docs.dask.org/en/latest/bag.html)
* [Bag screencast](https://youtu.be/-qIiJ1XtSv0)
* [Bag API](https://docs.dask.org/en/latest/bag-api.html)
* [Bag examples](https://examples.dask.org/bag.html)

## Shutdown

In [None]:
client.shutdown()