# dask.bag and dask.dataframe

The standard imports for `dask.bag` and `dask.dataframe` are `db` and `dd`, respectively.

In [None]:
from distributed import Client
client = Client("127.0.0.1:61329")

In [None]:
import dask.bag as db
import dask.dataframe as dd

The simplest bag example possible:

In [None]:
bag = db.from_sequence([1, 2, 3, 4, 5, 6], npartitions=2)

In [None]:
evens_squared = bag.filter(lambda x: x % 2 == 0).map(lambda x: x ** 2)

In [None]:
evens_squared.compute()

Let's look at something a bit more interesting, some US Federal Election Commission political campaign contribution data stored in a number of plain text files.

In [None]:
from pathlib import Path
columns = Path("data/fec/columns.csv").read_text().strip().split(",")
columns

In [None]:
!head -n 3 data/fec/contributions/d100.txt

We'll create a bag by:
- reading the text files (with `read_text`),
- doing a bit of per-line processing (with `map`),
- and limit the dataset to Atlanta, GA (with `filter`).

In [None]:
bag = (db.read_text("data/fec/contributions/*")
         .map(lambda x: {k: v for k, v in zip(columns, x.split("|"))})
         .filter(lambda x: x["STATE"] == "GA" and x["CITY"] == "ATLANTA"))

A look at the most frequent donation:

In [None]:
freqs = bag.map(lambda x: x["TRANSACTION_AMT"]).frequencies(sort=True)

In [None]:
freqs.take(10)

We can convert the existing bag to a dataframe so we can take advantage of both the pandas-like API and the pandas-backend performance.

In [None]:
df = bag.to_dataframe(columns=columns)

We can see what the average contribution is in Atlanta:

In [None]:
df.TRANSACTION_AMT.mean()

Ah, well that didn't work. Let's use our pandas-like API for some help:

In [None]:
ta = dd.to_numeric(df.TRANSACTION_AMT, errors="coerce")

Using `persist` instead of `compute` will force the result of the computation to persist in memory, allowing for more performant repeated use.

In [None]:
ta = ta.persist()

In [None]:
ta = ta[(ta > 0) & (ta < 20000)]
ta.mean().compute()

In [None]:
ta[ta > 150].mean().compute()

In [None]:
import matplotlib.pyplot as plt
n, bins, pathes = plt.hist(ta[ta < 200].to_dask_array().compute(), bins=50)

In [None]:
ta.value_counts().nlargest(10).compute()