# Dask Bags


Dask Bag implements operations like `map`, `filter`, `groupby` and aggregations on collections of Python objects. It does this in parallel and in small memory using Python iterators. It is similar to a parallel version of itertools or a Pythonic version of the PySpark RDD.

Dask Bags are often used to do simple preprocessing on log files, JSON records, or other user defined Python objects.

Full API documentation is available here: http://docs.dask.org/en/latest/bag-api.html

## Start Dask Client for Dashboard

Starting the Dask Client is optional.  It will provide a dashboard which 
is useful to gain insight on the computation.  

The link to the dashboard will become visible when you create the client below.  We recommend having it open on one side of your screen while using your notebook on the other side.  This can take some effort to arrange your windows, but seeing them both at the same is very useful when learning.

In [1]:
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=1)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 38559 instead
  f"Port {expected} is already in use.\n"


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:38559/status,

0,1
Dashboard: http://127.0.0.1:38559/status,Workers: 4
Total threads: 4,Total memory: 4.39 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:41633,Workers: 4
Dashboard: http://127.0.0.1:38559/status,Total threads: 4
Started: Just now,Total memory: 4.39 GiB

0,1
Comm: tcp://127.0.0.1:42207,Total threads: 1
Dashboard: http://127.0.0.1:44203/status,Memory: 1.10 GiB
Nanny: tcp://127.0.0.1:45983,
Local directory: /home/jovyan/work/SOSC22-livesessions/material/05-dask/dask-worker-space/worker-wgoha2kt,Local directory: /home/jovyan/work/SOSC22-livesessions/material/05-dask/dask-worker-space/worker-wgoha2kt

0,1
Comm: tcp://127.0.0.1:40375,Total threads: 1
Dashboard: http://127.0.0.1:37633/status,Memory: 1.10 GiB
Nanny: tcp://127.0.0.1:41783,
Local directory: /home/jovyan/work/SOSC22-livesessions/material/05-dask/dask-worker-space/worker-8chrbvd3,Local directory: /home/jovyan/work/SOSC22-livesessions/material/05-dask/dask-worker-space/worker-8chrbvd3

0,1
Comm: tcp://127.0.0.1:42577,Total threads: 1
Dashboard: http://127.0.0.1:40795/status,Memory: 1.10 GiB
Nanny: tcp://127.0.0.1:39965,
Local directory: /home/jovyan/work/SOSC22-livesessions/material/05-dask/dask-worker-space/worker-n_9hnwdx,Local directory: /home/jovyan/work/SOSC22-livesessions/material/05-dask/dask-worker-space/worker-n_9hnwdx

0,1
Comm: tcp://127.0.0.1:41025,Total threads: 1
Dashboard: http://127.0.0.1:41447/status,Memory: 1.10 GiB
Nanny: tcp://127.0.0.1:42413,
Local directory: /home/jovyan/work/SOSC22-livesessions/material/05-dask/dask-worker-space/worker-fm86zfh3,Local directory: /home/jovyan/work/SOSC22-livesessions/material/05-dask/dask-worker-space/worker-fm86zfh3


## Create Random Data

We create a random set of record data and store it to disk as many JSON files.  This will serve as our data for this notebook.

In [3]:
!wget https://minio.131.154.96.42.myip.cloud.infn.it/public/data.tar.gz
!tar -xzvf data.tar.gz

--2022-12-01 08:26:03--  https://minio.131.154.96.42.myip.cloud.infn.it/public/data.tar.gz
Resolving minio.131.154.96.42.myip.cloud.infn.it (minio.131.154.96.42.myip.cloud.infn.it)... 131.154.96.42
Connecting to minio.131.154.96.42.myip.cloud.infn.it (minio.131.154.96.42.myip.cloud.infn.it)|131.154.96.42|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 626267 (612K) [application/x-gzip]
Saving to: ‘data.tar.gz’


2022-12-01 08:26:03 (111 MB/s) - ‘data.tar.gz’ saved [626267/626267]

data/0.json
data/1.json
data/2.json
data/3.json
data/4.json
data/5.json
data/6.json
data/7.json
data/8.json
data/9.json


## Read JSON data

Now that we have some JSON data in a file lets take a look at it with Dask Bag and Python JSON module.

In [4]:
!head -n 2 data/0.json

{"age": 28, "name": ["Shaun", "Marquez"], "occupation": "Supervisor", "telephone": "129-448-7343", "address": {"address": "396 Spring Valley Shore", "city": "Oak Creek"}, "credit-card": {"number": "4503 7878 0213 6768", "expiration-date": "03/18"}}
{"age": 60, "name": ["Johanne", "Medina"], "occupation": "Claims Assessor", "telephone": "+1-(899)-653-5634", "address": {"address": "971 Sunset Annex", "city": "Chino"}, "credit-card": {"number": "3770 545391 08904", "expiration-date": "07/24"}}


In [5]:
import dask.bag as db
import json

b = db.read_text('data/*.json').map(json.loads)
b

dask.bag<loads, npartitions=10>

In [6]:
b.take(2)

({'age': 28,
  'name': ['Shaun', 'Marquez'],
  'occupation': 'Supervisor',
  'telephone': '129-448-7343',
  'address': {'address': '396 Spring Valley Shore', 'city': 'Oak Creek'},
  'credit-card': {'number': '4503 7878 0213 6768',
   'expiration-date': '03/18'}},
 {'age': 60,
  'name': ['Johanne', 'Medina'],
  'occupation': 'Claims Assessor',
  'telephone': '+1-(899)-653-5634',
  'address': {'address': '971 Sunset Annex', 'city': 'Chino'},
  'credit-card': {'number': '3770 545391 08904', 'expiration-date': '07/24'}})

## Map, Filter, Aggregate

We can process this data by filtering out only certain records of interest, mapping functions over it to process our data, and aggregating those results to a total value.

In [7]:
b.filter(lambda record: record['age'] > 30).take(2)  # Select only people over 30

({'age': 60,
  'name': ['Johanne', 'Medina'],
  'occupation': 'Claims Assessor',
  'telephone': '+1-(899)-653-5634',
  'address': {'address': '971 Sunset Annex', 'city': 'Chino'},
  'credit-card': {'number': '3770 545391 08904', 'expiration-date': '07/24'}},
 {'age': 46,
  'name': ['Wilbert', 'Tyson'],
  'occupation': 'Licensing',
  'telephone': '(991) 478-7506',
  'address': {'address': '656 Kirkham Mews', 'city': 'Ridgewood'},
  'credit-card': {'number': '3700 774076 86104', 'expiration-date': '02/25'}})

In [8]:
b.map(lambda record: record['occupation']).take(2)  # Select the occupation field

('Supervisor', 'Claims Assessor')

In [9]:
b.count().compute()  # Count total number of records

10000

## Chain computations

It is common to do many of these steps in one pipeline, only calling `compute` or `take` at the end.

In [10]:
result = (b.filter(lambda record: record['age'] > 30)
           .map(lambda record: record['occupation'])
           .frequencies(sort=True)
           .topk(10, key=1))
result

dask.bag<topk-aggregate, npartitions=1>

As with all lazy Dask collections, we need to call `compute` to actually evaluate our result.  The `take` method used in earlier examples is also like `compute` and will also trigger computation.

In [11]:
result.compute()

[('Training Assistant', 15),
 ('Counsellor', 15),
 ('Precision Engineer', 14),
 ('Assessor', 14),
 ('Gardener', 14),
 ('Records Supervisor', 14),
 ('Miner', 14),
 ('Auction Worker', 13),
 ('Leather Worker', 13),
 ('Restaurateur', 13)]

## Transform and Store

Sometimes we want to compute aggregations as above, but sometimes we want to store results to disk for future analyses.  For that we can use methods like `to_textfiles` and `json.dumps`, or we can convert to Dask Dataframes and use their storage systems, which we'll see more of in the next section.

In [12]:
(b.filter(lambda record: record['age'] > 30)  # Select records of interest
  .map(json.dumps)                            # Convert Python objects to text
  .to_textfiles('data/processed.*.json'))     # Write to local disk

['/home/jovyan/work/SOSC22-livesessions/material/05-dask/data/processed.0.json',
 '/home/jovyan/work/SOSC22-livesessions/material/05-dask/data/processed.1.json',
 '/home/jovyan/work/SOSC22-livesessions/material/05-dask/data/processed.2.json',
 '/home/jovyan/work/SOSC22-livesessions/material/05-dask/data/processed.3.json',
 '/home/jovyan/work/SOSC22-livesessions/material/05-dask/data/processed.4.json',
 '/home/jovyan/work/SOSC22-livesessions/material/05-dask/data/processed.5.json',
 '/home/jovyan/work/SOSC22-livesessions/material/05-dask/data/processed.6.json',
 '/home/jovyan/work/SOSC22-livesessions/material/05-dask/data/processed.7.json',
 '/home/jovyan/work/SOSC22-livesessions/material/05-dask/data/processed.8.json',
 '/home/jovyan/work/SOSC22-livesessions/material/05-dask/data/processed.9.json']

## Convert to Dask Dataframes

Dask Bags are good for reading in initial data, doing a bit of pre-processing, and then handing off to some other more efficient form like Dask Dataframes.  Dask Dataframes use Pandas internally, and so can be much faster on numeric data and also have more complex algorithms.  

However, Dask Dataframes also expect data that is organized as flat columns.  It does not support nested JSON data very well (Bag is better for this).

Here we make a function to flatten down our nested data structure, map that across our records, and then convert that to a Dask Dataframe.

In [14]:
b.take(1)

({'age': 28,
  'name': ['Shaun', 'Marquez'],
  'occupation': 'Supervisor',
  'telephone': '129-448-7343',
  'address': {'address': '396 Spring Valley Shore', 'city': 'Oak Creek'},
  'credit-card': {'number': '4503 7878 0213 6768',
   'expiration-date': '03/18'}},)

In [13]:
def flatten(record):
    return {
        'age': record['age'],
        'occupation': record['occupation'],
        'telephone': record['telephone'],
        'credit-card-number': record['credit-card']['number'],
        'credit-card-expiration': record['credit-card']['expiration-date'],
        'name': ' '.join(record['name']),
        'street-address': record['address']['address'],
        'city': record['address']['city']   
    }

b.map(flatten).take(1)

({'age': 28,
  'occupation': 'Supervisor',
  'telephone': '129-448-7343',
  'credit-card-number': '4503 7878 0213 6768',
  'credit-card-expiration': '03/18',
  'name': 'Shaun Marquez',
  'street-address': '396 Spring Valley Shore',
  'city': 'Oak Creek'},)

In [15]:
df = b.map(flatten).to_dataframe()
df.head()

Unnamed: 0,age,occupation,telephone,credit-card-number,credit-card-expiration,name,street-address,city
0,28,Supervisor,129-448-7343,4503 7878 0213 6768,03/18,Shaun Marquez,396 Spring Valley Shore,Oak Creek
1,60,Claims Assessor,+1-(899)-653-5634,3770 545391 08904,07/24,Johanne Medina,971 Sunset Annex,Chino
2,25,Laminator,249.225.0416,5417 6978 2593 3350,04/19,Heidy Dickson,147 Hamlin Road,Miami
3,24,Studio Manager,219-065-3646,3783 748950 96421,12/25,Gertrudis Jacobs,874 Dellbrook Ferry,Sycamore
4,19,Rally Driver,152-654-8873,3497 277944 42234,01/18,Nadene Mathews,1219 Parkhurst Row,Bartlesville


We can now perform the same computation as before, but now using Pandas and Dask dataframe.

In [16]:
df[df.age > 30].occupation.value_counts().nlargest(10).compute()

Training Assistant    15
Counsellor            15
Miner                 14
Records Supervisor    14
Gardener              14
Assessor              14
Precision Engineer    14
Restaurateur          13
Leather Worker        13
Auction Worker        13
Name: occupation, dtype: int64

## Learn More

You may be interested in the following links:

-  [Dask Bag Documentation](https://docs.dask.org/en/latest/bag.html)
-  [API Documentation](http://docs.dask.org/en/latest/bag-api.html)
-  [dask tutorial](https://github.com/dask/dask-tutorial), notebook 02, for a more in-depth introduction.