# Install dependencies
- `apache-beam[dask]`: core package being demonstrated here
- `bokeh`: for dask dashboard
- `mimesis`: required for generating example data
- `Pygments`: to `cat` example beam script with syntax highlighting

In [1]:
# !pip install "apache-beam[dask]" "bokeh!=3.0.*,>=2.4.2" mimesis Pygments

Pinning upper bound of `dask` & `distributed` to `2023.9.2` as a workaround until
[this fix](https://github.com/apache/beam/pull/27618/files#diff-bfb5ae715e9067778f492058e8a02ff877d6e7584624908ddbdd316853e6befbL102-R107)
goes in.


In [2]:
# !pip install -U "distributed>=2022.6.0,<2023.9.3"

# Start a client

In [3]:
from distributed import Client
client = Client()
client.dashboard_link

'http://127.0.0.1:8787/status'

# Example data

Based on https://examples.dask.org/bag.html#Create-Random-Data

In [4]:
import dask
import json
import tempfile

td = tempfile.TemporaryDirectory()
dask.datasets.make_people().map(json.dumps).to_textfiles(f'{td.name}/*.json')

['/var/folders/tt/4f941hdn0zq549zdwhcgg98c0000gn/T/tmprfxai8wf/0.json',
 '/var/folders/tt/4f941hdn0zq549zdwhcgg98c0000gn/T/tmprfxai8wf/1.json',
 '/var/folders/tt/4f941hdn0zq549zdwhcgg98c0000gn/T/tmprfxai8wf/2.json',
 '/var/folders/tt/4f941hdn0zq549zdwhcgg98c0000gn/T/tmprfxai8wf/3.json',
 '/var/folders/tt/4f941hdn0zq549zdwhcgg98c0000gn/T/tmprfxai8wf/4.json',
 '/var/folders/tt/4f941hdn0zq549zdwhcgg98c0000gn/T/tmprfxai8wf/5.json',
 '/var/folders/tt/4f941hdn0zq549zdwhcgg98c0000gn/T/tmprfxai8wf/6.json',
 '/var/folders/tt/4f941hdn0zq549zdwhcgg98c0000gn/T/tmprfxai8wf/7.json',
 '/var/folders/tt/4f941hdn0zq549zdwhcgg98c0000gn/T/tmprfxai8wf/8.json',
 '/var/folders/tt/4f941hdn0zq549zdwhcgg98c0000gn/T/tmprfxai8wf/9.json']

Note data is in https://jsonlines.org format:

In [5]:
!head -n 2 {td.name}/0.json

{"age": 47, "name": ["Deadra", "Landry"], "occupation": "Moneylender", "telephone": "+1-956-698-8149", "address": {"address": "993 John Muir Creek", "city": "Murray"}, "credit-card": {"number": "5157 5211 2472 8259", "expiration-date": "10/23"}}
{"age": 26, "name": ["Shirleen", "Shaw"], "occupation": "Lawyer", "telephone": "+16190953288", "address": {"address": "578 Marina Ferry", "city": "Santee"}, "credit-card": {"number": "4541 9038 5492 9752", "expiration-date": "12/24"}}


# Dask

Read, load, and filter the data using the Dask Bag API.

This is based on https://examples.dask.org/bag.html#Chain-computations.

It omits aggregation & sampling (`.frequencies`, `.topk`),
because these operations are not yet replicable for beam-on-dask
(see **Discussion > Next Steps** below).

In [6]:
import dask.bag as db

b = (
    db
    .read_text(f'{td.name}/*.json')
    .map(json.loads)
    .filter(lambda record: record['age'] > 30)
    .filter(lambda record: record['name'][0].startswith('A'))
    .filter(lambda record: record['name'][1].startswith('B'))
    .filter(lambda record: record['occupation'].startswith('C'))
    .map(lambda record: (" ".join(record['name']), record['age'], record['occupation']))
)
b.compute()

[('August Bond', 54, 'Chimney Sweep'),
 ('Adan Bell', 41, 'Carpet Fitter'),
 ('Ashley Berry', 32, 'Cabinet Maker')]

# Beam

Read, load, and apply the same filters using the Beam API.

Beam's `DaskRunner` doesn't yet support ipython evaluation, so we use a Python script:

In [7]:
!pygmentize -g example.py

[34mimport[39;49;00m [04m[36mglob[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m [04m[36mjson[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m [04m[36msys[39;49;00m[37m[39;49;00m
[37m[39;49;00m
[34mimport[39;49;00m [04m[36mapache_beam[39;49;00m [34mas[39;49;00m [04m[36mbeam[39;49;00m[37m[39;49;00m
[34mfrom[39;49;00m [04m[36mapache_beam[39;49;00m[04m[36m.[39;49;00m[04m[36moptions[39;49;00m[04m[36m.[39;49;00m[04m[36mpipeline_options[39;49;00m [34mimport[39;49;00m PipelineOptions[37m[39;49;00m
[34mfrom[39;49;00m [04m[36mapache_beam[39;49;00m[04m[36m.[39;49;00m[04m[36mrunners[39;49;00m[04m[36m.[39;49;00m[04m[36mdask[39;49;00m[04m[36m.[39;49;00m[04m[36mdask_runner[39;49;00m [34mimport[39;49;00m DaskRunner[37m[39;49;00m
[37m[39;49;00m
[37m[39;49;00m
[34mdef[39;49;00m [32myield_jsonlines[39;49;00m(fname: [36mstr[39;49;00m):[37m[39;49;00m
    [34mwith[39;49;00m [36mopen[39;49;00m(fname) [34mas[39;49;

And run this computation on the _same Dask cluster_ as we used for the Dask Bag operation:

In [8]:
!python -W ignore example.py {td.name} --dask_client_address={client.scheduler.address}

('August Bond', 54, 'Chimney Sweep')
('Adan Bell', 41, 'Carpet Fitter')
('Ashley Berry', 32, 'Cabinet Maker')


```
NOTE(cisaacstern): How well does GroupBy work on dask already and should we highlight that in the example?
```

# Discussion

## Why

<ul style="list-style-type:none;">
  <li>
    ðŸ’¡ Beam has a large built-in community, but poor deployment stories for HPC + AWS.
    Dask can fill that gap!
  </li>
  <li>
    ðŸ’¡ The <a href="https://pangeo-forge.readthedocs.io/en/latest/">Pangeo Forge</a>
    community is motivated by this.
    <a href="https://xarray-beam.readthedocs.io/en/latest/">Xarray-Beam</a>
    is another interesting use case.
  </li>
  <li>
    ðŸ’¡... more motivation here ...
  </li>
</ul>

## What works

<ul style="list-style-type:none;">
  <li>âœ… Beam pipelines can be compiled to run against various runners, including Dask!</li>
  <li>âœ… Existing support for elementwise operations (map, flatmap, filter, etc.)</li>
  <li>âœ… ...more existing successes here... </li>
</ul>

## Next steps

<ul style="list-style-type:none;">
  <li>â­• Aggregations/reductions (frequencies, folds, etc.) are not implemented, but can be!</li>
  <li>â­• ... more discussion points here ...</li>
  <li>â­• ... more discussion points here ...</li>
</ul>
