In [1]:
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=1)
client

0,1
Client  Scheduler: tcp://127.0.0.1:33271  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 16.50 GB


# Generate Data

In [4]:
import dask
import json

fldr = 'data_bag/json'     # data directory 
b = dask.datasets.make_people(seed=0)

In [2]:
b

dask.bag<mimesis, npartitions=10>

In [5]:
b.map(json.dumps).to_textfiles(f'{fldr}/*.json')

['/media/data/Projects/Dask-Book/Machine-Learning-and-Data-Analysis-with-Dask/Chapter2/data_bag/json/0.json',
 '/media/data/Projects/Dask-Book/Machine-Learning-and-Data-Analysis-with-Dask/Chapter2/data_bag/json/1.json',
 '/media/data/Projects/Dask-Book/Machine-Learning-and-Data-Analysis-with-Dask/Chapter2/data_bag/json/2.json',
 '/media/data/Projects/Dask-Book/Machine-Learning-and-Data-Analysis-with-Dask/Chapter2/data_bag/json/3.json',
 '/media/data/Projects/Dask-Book/Machine-Learning-and-Data-Analysis-with-Dask/Chapter2/data_bag/json/4.json',
 '/media/data/Projects/Dask-Book/Machine-Learning-and-Data-Analysis-with-Dask/Chapter2/data_bag/json/5.json',
 '/media/data/Projects/Dask-Book/Machine-Learning-and-Data-Analysis-with-Dask/Chapter2/data_bag/json/6.json',
 '/media/data/Projects/Dask-Book/Machine-Learning-and-Data-Analysis-with-Dask/Chapter2/data_bag/json/7.json',
 '/media/data/Projects/Dask-Book/Machine-Learning-and-Data-Analysis-with-Dask/Chapter2/data_bag/json/8.json',
 '/media/d

# Read Data

**Questions**
* How many records there are 
* How many credit cards are expired? 
* Most popular name or occupation for people with age between 40 and 65 

In [5]:
import json
import dask.bag as db
fldr = 'data_bag'

In [6]:
b = db.read_text(f'{fldr}/*.json').map(json.loads)

As we saw on the previous chapther b is a Dask object so in order to explore the data we could use b.compute() which works because the data in this case is small enough to fit in memory. Alternative we can use b.take(n) to print the first n records. From the output of b.take(4) we can see that these records contain credit-card information as age, name, profession, phone number and address. You might notice that in some cases the credit card is expired, the format for the phone number is not always the same and it might contain the international phone codes.

In [37]:
b.take(4)

({'age': 52,
  'name': ['Zulema', 'Walters'],
  'occupation': 'Pools Collector',
  'telephone': '949-313-2340',
  'address': {'address': '1159 Crestwell Pike', 'city': 'Providence'},
  'credit-card': {'number': '3485 248758 62500', 'expiration-date': '04/22'}},
 {'age': 25,
  'name': ['Claris', 'Chase'],
  'occupation': 'Clergyman',
  'telephone': '(484) 989-0982',
  'address': {'address': '303 Jason Spur', 'city': 'Kettering'},
  'credit-card': {'number': '4097 0322 1214 1781',
   'expiration-date': '08/16'}},
 {'age': 44,
  'name': ['Jorge', 'Klein'],
  'occupation': 'Building Control',
  'telephone': '1-619-049-6745',
  'address': {'address': '1145 Otis Esplanade', 'city': 'Hastings'},
  'credit-card': {'number': '5136 9375 5713 6925',
   'expiration-date': '08/18'}},
 {'age': 45,
  'name': ['Heriberto', 'Waller'],
  'occupation': 'Slaughterman',
  'telephone': '047.023.6435',
  'address': {'address': '25 Heron Point', 'city': 'Wausau'},
  'credit-card': {'number': '5312 6379 9376 4

In [39]:
N = b.count().compute()
N

10000

In [9]:
from datetime import datetime
record = b.take(4)[0]
record['credit-card']['expiration-date']

'04/22'

In [14]:
# date_lmt = datetime.today()
date_lmt = datetime(2020, 10 , 26)

In [15]:
def is_expired(record, date_lmt):
    exp_date = datetime.strptime(record['credit-card']['expiration-date'],
                  '%m/%y') 
    return exp_date < date_lmt

In [16]:
n_exp = b.filter(lambda record:
                 is_expired(record, today)).count().compute()

In [18]:
n_exp/N

0.4791

## Question 3

In [18]:
b_sample = db.from_sequence(b.take(10))

In [19]:
b_sample.filter(lambda record:
                record['age']>=40 and record['age']<=65).compute()

[{'age': 52,
  'name': ['Zulema', 'Walters'],
  'occupation': 'Pools Collector',
  'telephone': '949-313-2340',
  'address': {'address': '1159 Crestwell Pike', 'city': 'Providence'},
  'credit-card': {'number': '3485 248758 62500', 'expiration-date': '04/22'}},
 {'age': 44,
  'name': ['Jorge', 'Klein'],
  'occupation': 'Building Control',
  'telephone': '1-619-049-6745',
  'address': {'address': '1145 Otis Esplanade', 'city': 'Hastings'},
  'credit-card': {'number': '5136 9375 5713 6925',
   'expiration-date': '08/18'}},
 {'age': 45,
  'name': ['Heriberto', 'Waller'],
  'occupation': 'Slaughterman',
  'telephone': '047.023.6435',
  'address': {'address': '25 Heron Point', 'city': 'Wausau'},
  'credit-card': {'number': '5312 6379 9376 4977',
   'expiration-date': '05/19'}},
 {'age': 58,
  'name': ['Floria', 'Webb'],
  'occupation': 'Blinds Installer',
  'telephone': '(580) 532-0571',
  'address': {'address': '861 Chancery Bay', 'city': 'Evansville'},
  'credit-card': {'number': '5347 12

In [20]:
b_sample.filter(lambda record:
                record['age']>=40 and record['age']<=65)\
        .map(lambda record: record['occupation']).compute()

['Pools Collector',
 'Building Control',
 'Slaughterman',
 'Blinds Installer',
 'Milkman',
 'Entertainer']

In [64]:
b.frequencies?

[0;31mSignature:[0m [0mb[0m[0;34m.[0m[0mfrequencies[0m[0;34m([0m[0msplit_every[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0msort[0m[0;34m=[0m[0;32mFalse[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Count number of occurrences of each distinct element.

>>> b = from_sequence(['Alice', 'Bob', 'Alice'])
>>> dict(b.frequencies())  # doctest: +SKIP
{'Alice': 2, 'Bob', 1}
[0;31mFile:[0m      ~/.conda/envs/packt_dask/lib/python3.8/site-packages/dask/bag/core.py
[0;31mType:[0m      method


In [65]:
b_sample.filter(lambda record:
                record['age']>=40 and record['age']<=65)\
        .map(lambda record: record['occupation'])\
        .frequencies(sort=True).compute()

[('Pools Collector', 1),
 ('Building Control', 1),
 ('Slaughterman', 1),
 ('Blinds Installer', 1),
 ('Milkman', 1),
 ('Entertainer', 1)]

In [22]:
b_sample.filter(lambda record:
                record['age']>=40 and record['age']<=65)\
        .map(lambda record: record['occupation'])\
        .frequencies(sort=True)\
        .topk(3, key=1)\
        .compute()

[('Pools Collector', 1), ('Building Control', 1), ('Slaughterman', 1)]

In [28]:
b.filter(lambda record: record['age']>=40 and record['age']<=65)\
 .map(lambda record: record['occupation'])\
 .frequencies(sort=True)\
 .topk(3, key=1)\
 .compute()

[('Warehouseman', 17), ('Probation Worker', 13), ('Project Worker', 13)]

In [29]:
b.filter(lambda record:
         record['age']>=40 and record['age']<=65)\
 .map(lambda record: record['occupation'])\
 .frequencies(sort=True)\
 .topk(3, key=1)\
 .compute()

[('Warehouseman', 17), ('Probation Worker', 13), ('Project Worker', 13)]

In [30]:
b.filter(lambda record: record['age']>=40 and record['age']<=65)\
 .map(lambda record: record['occupation'])\
 .frequencies()\
 .topk(3, key=1)\
 .compute()

[('Warehouseman', 17), ('Probation Worker', 13), ('Project Worker', 13)]

In [32]:
b.filter(lambda record: record['age']>=40 and record['age']<=65)\
 .map(lambda record: record['occupation'])\
 .frequencies(sort=True)\
 .take(3)

(('Warehouseman', 17), ('Probation Worker', 13), ('Project Worker', 13))

In [73]:
b.filter(lambda record: record['age']>=40 and record['age']<=65)\
 .map(lambda record: record['name'][0])\
 .frequencies(sort=True)\
 .topk(10, key=1)\
 .compute()

[('Sang', 10),
 ('Mitchell', 9),
 ('Stephen', 9),
 ('Timothy', 8),
 ('Norman', 8),
 ('Moses', 7),
 ('Louie', 7),
 ('Frank', 7),
 ('Gerald', 7),
 ('Lorilee', 7)]

In [25]:
b.filter(lambda record:
         record['age']>=40 and record['age']<=65)\
 .map(lambda record: record['name'][1])\
 .frequencies(sort=True)\
 .topk(3, key=1)\
 .compute()

[('Reyes', 14), ('Cooper', 13), ('Roman', 13)]

In [26]:
b.filter(lambda record:
         record['age']>=40 and record['age']<=65)\
 .map(lambda record: record['name'][1])\
 .frequencies()\
 .topk(3, key=1)\
 .compute()

[('Reyes', 14), ('Cooper', 13), ('Roman', 13)]

In [24]:
b.topk?

[0;31mSignature:[0m [0mb[0m[0;34m.[0m[0mtopk[0m[0;34m([0m[0mk[0m[0;34m,[0m [0mkey[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0msplit_every[0m[0;34m=[0m[0;32mNone[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
K largest elements in collection

Optionally ordered by some key function

>>> b = from_sequence([10, 3, 5, 7, 11, 4])
>>> list(b.topk(2))  # doctest: +SKIP
[11, 10]

>>> list(b.topk(2, lambda x: -x))  # doctest: +SKIP
[3, 4]
[0;31mFile:[0m      ~/.conda/envs/packt_dask/lib/python3.8/site-packages/dask/bag/core.py
[0;31mType:[0m      method


# Example Get Data
Another possible example is exploit Dask parallelism to recover eartquakes data

In [3]:
import os
import dask.bag as db
import pandas as pd

In [4]:
fldr = "data_earthquakes"
os.makedirs(fldr, exist_ok=True)

In [5]:
urls = ['https://en.wikipedia.org/wiki/List_of_earthquakes_in_Italy',
        'https://en.wikipedia.org/wiki/List_of_earthquakes_in_Argentina',
        'https://en.wikipedia.org/wiki/List_of_earthquakes_in_Chile']

In [6]:
url = urls[0]

In [7]:
dfs = pd.read_html(url)

In [19]:
def get_eartquakes(url, fldr=None):
    fn = url.split("_")[-1]+".csv"
    if fldr is not None:
        fn = os.path.join(fldr, fn)
    dfs = pd.read_html(url)
    for df in dfs:
        cols = df.columns
        if "Mag." in cols:
            df.to_csv(fn, index=False)
            break

In [20]:
b = db.from_sequence(urls)

In [22]:
out = b.map(lambda url: get_eartquakes(url, fldr)).compute()