In [1]:
from dask.distributed import Client
import dask.bag as db
import os
import json

client = Client(n_workers=4)

  data = yaml.load(f.read()) or {}
  defaults = yaml.load(f)


# Dask bags
- meant to replicate style of pyspark

In [2]:
b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], npartitions=4)
b.take(3)

(1, 2, 3)

In [3]:
def is_even(n):
    return n % 2 == 0

b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
c = b.filter(is_even).map(lambda x: x ** 2)
c

dask.bag<lambda-..., npartitions=10>

In [4]:
c.compute()

[4, 16, 36, 64, 100]

# chaining

In [8]:
b.filter(is_even).map(lambda x: x ** 2).sum().compute()

220

# Json example

In [89]:
import dask.bag as db
import json

b = db.read_text('example-data/accounts/accounty.json').map(json.loads)

In [90]:
b.take(2)

({'id': 86,
  'name': 'Alice',
  'transactions': [{'transaction-id': 8803, 'amount': 1415}]},
 {'id': 1,
  'name': 'Norbert',
  'transactions': [{'transaction-id': 8801, 'amount': 1413}]})

In [91]:
b.filter(lambda record: record['name'] == 'Alice').take(5)

({'id': 86,
  'name': 'Alice',
  'transactions': [{'transaction-id': 8803, 'amount': 1415}]},
 {'id': 45,
  'name': 'Alice',
  'transactions': [{'transaction-id': 8802, 'amount': 1416},
   {'transaction-id': 184, 'amount': 3349}]})

In [92]:
def count_transactions(d):
    return {'name': d['name'], 'count': len(d['transactions'])}

# map: apply a function to each element
(b.filter(lambda record: record['name'] == 'Alice')
   .map(count_transactions)
   .take(5))

({'name': 'Alice', 'count': 1}, {'name': 'Alice', 'count': 2})

In [94]:
# pluck: select a field, as from a dictionary, element[field]
(b.filter(lambda record: record['name'] == 'Alice')
   .map(count_transactions)
   .pluck('count')
   .take(5))

(1, 2)

In [99]:
(b.filter(lambda record: record['name'] == 'Alice')
     .map(count_transactions)
     .pluck('count')
     .mean()
     .compute())

1.5

In [101]:
# Flatten to denest

(b.filter(lambda record: record['name'] == 'Alice')
   .pluck('transactions')
   .take(5))

([{'transaction-id': 8803, 'amount': 1415}],
 [{'transaction-id': 8802, 'amount': 1416},
  {'transaction-id': 184, 'amount': 3349}])

In [103]:
(b.filter(lambda record: record['name'] == 'Alice')
   .pluck('transactions')
   .flatten()
   .pluck('amount')
   .take(3))

(1415, 1416, 3349)

In [105]:
(b.filter(lambda record: record['name'] == 'Alice')
   .pluck('transactions')
   .flatten()
   .pluck('amount')
   .mean()
   .compute())

2060.0

# group-by

In [106]:
b = db.from_sequence(['Alice', 'Bob', 'Charlie', 'Dan', 'Edith', 'Frank'])
b.groupby(len).compute()  # names grouped by length

[(7, ['Charlie']), (3, ['Bob', 'Dan']), (5, ['Alice', 'Edith', 'Frank'])]

In [107]:
b = db.from_sequence(list(range(10)))
b.groupby(lambda x: x % 2).compute()

[(0, [0, 2, 4, 6, 8]), (1, [1, 3, 5, 7, 9])]

In [108]:
b.groupby(lambda x: x % 2).starmap(lambda k, v: (k, max(v))).compute()

[(0, 8), (1, 9)]

# Json example 2

In [67]:
import dask.bag as db
import json

b = db.read_text('example-data/accounts/accountx.json').map(json.loads)

In [68]:
b

dask.bag<loads-1..., npartitions=1>

In [69]:
b.take(2)

({'age': 45,
  'name': ['Ellyn', 'Head'],
  'occupation': 'Health Visitor',
  'telephone': '+1-(485)-639-9596',
  'address': {'address': '955 Flora Extension', 'city': 'Lake Zurich'},
  'credit-card': {'number': '4193 2628 9870 3827',
   'expiration-date': '06/22'}},
 {'age': 45,
  'name': ['Ta', 'Schmidt'],
  'occupation': 'Optical Advisor',
  'telephone': '357-161-9324',
  'address': {'address': '336 Sutro Heights Townline', 'city': 'Manchester'},
  'credit-card': {'number': '3741 717460 63871', 'expiration-date': '12/19'}})

In [70]:
b.filter(lambda record: record["telephone"] == "357-161-9324").take(1)

({'age': 45,
  'name': ['Ta', 'Schmidt'],
  'occupation': 'Optical Advisor',
  'telephone': '357-161-9324',
  'address': {'address': '336 Sutro Heights Townline', 'city': 'Manchester'},
  'credit-card': {'number': '3741 717460 63871', 'expiration-date': '12/19'}},)

In [71]:
b.map(lambda record: record['occupation']).take(2) 

('Health Visitor', 'Optical Advisor')

In [72]:
b.count().compute()  # Count total number of records

2

In [73]:
b.filter(lambda record: record['age'] > 30).take(2)

({'age': 45,
  'name': ['Ellyn', 'Head'],
  'occupation': 'Health Visitor',
  'telephone': '+1-(485)-639-9596',
  'address': {'address': '955 Flora Extension', 'city': 'Lake Zurich'},
  'credit-card': {'number': '4193 2628 9870 3827',
   'expiration-date': '06/22'}},
 {'age': 45,
  'name': ['Ta', 'Schmidt'],
  'occupation': 'Optical Advisor',
  'telephone': '357-161-9324',
  'address': {'address': '336 Sutro Heights Townline', 'city': 'Manchester'},
  'credit-card': {'number': '3741 717460 63871', 'expiration-date': '12/19'}})

# chain computations

In [75]:
result = (b.filter(lambda record: record['age'] > 30)
           .map(lambda record: record['occupation'])
           .frequencies(sort=True)
           .topk(10, key=1))
result.compute()

[('Health Visitor', 1), ('Optical Advisor', 1)]

# Dask Bag to Dask Dataframe

In [76]:
def flatten(record):
    return {
        'age': record['age'],
        'occupation': record['occupation'],
        'telephone': record['telephone'],
        'credit-card-number': record['credit-card']['number'],
        'credit-card-expiration': record['credit-card']['expiration-date'],
        'name': ' '.join(record['name']),
        'street-address': record['address']['address'],
        'city': record['address']['city']
    }

In [79]:
b.map(flatten).take(2)

({'age': 45,
  'occupation': 'Health Visitor',
  'telephone': '+1-(485)-639-9596',
  'credit-card-number': '4193 2628 9870 3827',
  'credit-card-expiration': '06/22',
  'name': 'Ellyn Head',
  'street-address': '955 Flora Extension',
  'city': 'Lake Zurich'},
 {'age': 45,
  'occupation': 'Optical Advisor',
  'telephone': '357-161-9324',
  'credit-card-number': '3741 717460 63871',
  'credit-card-expiration': '12/19',
  'name': 'Ta Schmidt',
  'street-address': '336 Sutro Heights Townline',
  'city': 'Manchester'})

In [80]:
dask_df = b.map(flatten).to_dataframe()

In [83]:
# Dask dataframe operations can now be chained and applied:
dask_df[dask_df.age > 30].occupation.value_counts().nlargest(10).compute()

Optical Advisor    1
Health Visitor     1
Name: occupation, dtype: int64