# Start Dask Client for Dashboard

In [1]:
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=1)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 33683 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:33683/status,

0,1
Dashboard: http://127.0.0.1:33683/status,Workers: 4
Total threads: 4,Total memory: 7.69 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:36867,Workers: 4
Dashboard: http://127.0.0.1:33683/status,Total threads: 4
Started: Just now,Total memory: 7.69 GiB

0,1
Comm: tcp://127.0.0.1:37143,Total threads: 1
Dashboard: http://127.0.0.1:40817/status,Memory: 1.92 GiB
Nanny: tcp://127.0.0.1:36335,
Local directory: /tmp/dask-scratch-space/worker-ftm597ly,Local directory: /tmp/dask-scratch-space/worker-ftm597ly

0,1
Comm: tcp://127.0.0.1:38761,Total threads: 1
Dashboard: http://127.0.0.1:39819/status,Memory: 1.92 GiB
Nanny: tcp://127.0.0.1:33181,
Local directory: /tmp/dask-scratch-space/worker-c79k6mc0,Local directory: /tmp/dask-scratch-space/worker-c79k6mc0

0,1
Comm: tcp://127.0.0.1:36377,Total threads: 1
Dashboard: http://127.0.0.1:45187/status,Memory: 1.92 GiB
Nanny: tcp://127.0.0.1:41153,
Local directory: /tmp/dask-scratch-space/worker-nv6tqpaq,Local directory: /tmp/dask-scratch-space/worker-nv6tqpaq

0,1
Comm: tcp://127.0.0.1:45509,Total threads: 1
Dashboard: http://127.0.0.1:37281/status,Memory: 1.92 GiB
Nanny: tcp://127.0.0.1:35433,
Local directory: /tmp/dask-scratch-space/worker-d141eceu,Local directory: /tmp/dask-scratch-space/worker-d141eceu


In [3]:
import dask
import json
import os

os.makedirs('data', exist_ok=True)              # Create data/ directory

b = dask.datasets.make_people()                 # Make records of people
b.map(json.dumps).to_textfiles('data/*.json')   # Encode as JSON, write to disk

['/home/carlos/PycharmProjects/cbnetwork/libraries_examples/dask/local/data/0.json',
 '/home/carlos/PycharmProjects/cbnetwork/libraries_examples/dask/local/data/1.json',
 '/home/carlos/PycharmProjects/cbnetwork/libraries_examples/dask/local/data/2.json',
 '/home/carlos/PycharmProjects/cbnetwork/libraries_examples/dask/local/data/3.json',
 '/home/carlos/PycharmProjects/cbnetwork/libraries_examples/dask/local/data/4.json',
 '/home/carlos/PycharmProjects/cbnetwork/libraries_examples/dask/local/data/5.json',
 '/home/carlos/PycharmProjects/cbnetwork/libraries_examples/dask/local/data/6.json',
 '/home/carlos/PycharmProjects/cbnetwork/libraries_examples/dask/local/data/7.json',
 '/home/carlos/PycharmProjects/cbnetwork/libraries_examples/dask/local/data/8.json',
 '/home/carlos/PycharmProjects/cbnetwork/libraries_examples/dask/local/data/9.json']

# Read JSON data

In [4]:
!head -n 2 data/0.json

{"age": 1, "name": ["Jeromy", "Jordan"], "occupation": "Horticultural Consultant", "telephone": "+1-239-070-6832", "address": {"address": "28 Lee Square", "city": "Redondo Beach"}, "credit-card": {"number": "4305 7922 0181 3652", "expiration-date": "10/19"}}
{"age": 38, "name": ["Porsha", "O'brien"], "occupation": "Medical Consultant", "telephone": "+1-201-354-9454", "address": {"address": "1222 Escondido Expressway", "city": "Bellaire"}, "credit-card": {"number": "4508 6491 2587 5776", "expiration-date": "05/24"}}


In [5]:
import dask.bag as db
import json

b = db.read_text('data/*.json').map(json.loads)
b

dask.bag<loads, npartitions=10>

In [6]:
b.take(2)

({'age': 1,
  'name': ['Jeromy', 'Jordan'],
  'occupation': 'Horticultural Consultant',
  'telephone': '+1-239-070-6832',
  'address': {'address': '28 Lee Square', 'city': 'Redondo Beach'},
  'credit-card': {'number': '4305 7922 0181 3652',
   'expiration-date': '10/19'}},
 {'age': 38,
  'name': ['Porsha', "O'brien"],
  'occupation': 'Medical Consultant',
  'telephone': '+1-201-354-9454',
  'address': {'address': '1222 Escondido Expressway', 'city': 'Bellaire'},
  'credit-card': {'number': '4508 6491 2587 5776',
   'expiration-date': '05/24'}})

# Map, Filter, Aggregate

In [7]:
b.filter(lambda record: record['age'] > 30).take(2)  # Select only people over 30

({'age': 38,
  'name': ['Porsha', "O'brien"],
  'occupation': 'Medical Consultant',
  'telephone': '+1-201-354-9454',
  'address': {'address': '1222 Escondido Expressway', 'city': 'Bellaire'},
  'credit-card': {'number': '4508 6491 2587 5776',
   'expiration-date': '05/24'}},
 {'age': 60,
  'name': ['Jessia', 'Mclean'],
  'occupation': 'Caretaker',
  'telephone': '+1-764-835-6634',
  'address': {'address': '136 Storey Canyon', 'city': 'Santa Monica'},
  'credit-card': {'number': '5507 3917 1578 9947',
   'expiration-date': '02/23'}})

In [8]:
b.map(lambda record: record['occupation']).take(2)  # Select the occupation field

('Horticultural Consultant', 'Medical Consultant')

In [9]:
b.count().compute()

10000

# Chain computations

In [10]:
result = (b.filter(lambda record: record['age'] > 30)
           .map(lambda record: record['occupation'])
           .frequencies(sort=True)
           .topk(10, key=1))
result

dask.bag<topk-aggregate, npartitions=1>

In [11]:
result.compute()

[('Mineralologist', 16),
 ('Potter', 16),
 ('Stud Hand', 16),
 ('Valve Technician', 15),
 ('Press Operator', 15),
 ('Marine Consultant', 15),
 ('Dealer', 15),
 ('Masseur', 15),
 ('Staff Nurse', 14),
 ('Transcriber', 14)]

# Transform and Store

In [12]:
(b.filter(lambda record: record['age'] > 30)  # Select records of interest
  .map(json.dumps)                            # Convert Python objects to text
  .to_textfiles('data/processed.*.json'))

['/home/carlos/PycharmProjects/cbnetwork/libraries_examples/dask/local/data/processed.0.json',
 '/home/carlos/PycharmProjects/cbnetwork/libraries_examples/dask/local/data/processed.1.json',
 '/home/carlos/PycharmProjects/cbnetwork/libraries_examples/dask/local/data/processed.2.json',
 '/home/carlos/PycharmProjects/cbnetwork/libraries_examples/dask/local/data/processed.3.json',
 '/home/carlos/PycharmProjects/cbnetwork/libraries_examples/dask/local/data/processed.4.json',
 '/home/carlos/PycharmProjects/cbnetwork/libraries_examples/dask/local/data/processed.5.json',
 '/home/carlos/PycharmProjects/cbnetwork/libraries_examples/dask/local/data/processed.6.json',
 '/home/carlos/PycharmProjects/cbnetwork/libraries_examples/dask/local/data/processed.7.json',
 '/home/carlos/PycharmProjects/cbnetwork/libraries_examples/dask/local/data/processed.8.json',
 '/home/carlos/PycharmProjects/cbnetwork/libraries_examples/dask/local/data/processed.9.json']

# Convert to Dask Dataframes

In [13]:
b.take(1)

({'age': 1,
  'name': ['Jeromy', 'Jordan'],
  'occupation': 'Horticultural Consultant',
  'telephone': '+1-239-070-6832',
  'address': {'address': '28 Lee Square', 'city': 'Redondo Beach'},
  'credit-card': {'number': '4305 7922 0181 3652',
   'expiration-date': '10/19'}},)

In [14]:
def flatten(record):
    return {
        'age': record['age'],
        'occupation': record['occupation'],
        'telephone': record['telephone'],
        'credit-card-number': record['credit-card']['number'],
        'credit-card-expiration': record['credit-card']['expiration-date'],
        'name': ' '.join(record['name']),
        'street-address': record['address']['address'],
        'city': record['address']['city']
    }

b.map(flatten).take(1)

({'age': 1,
  'occupation': 'Horticultural Consultant',
  'telephone': '+1-239-070-6832',
  'credit-card-number': '4305 7922 0181 3652',
  'credit-card-expiration': '10/19',
  'name': 'Jeromy Jordan',
  'street-address': '28 Lee Square',
  'city': 'Redondo Beach'},)

In [15]:
df = b.map(flatten).to_dataframe()
df.head()

Unnamed: 0,age,occupation,telephone,credit-card-number,credit-card-expiration,name,street-address,city
0,1,Horticultural Consultant,+1-239-070-6832,4305 7922 0181 3652,10/19,Jeromy Jordan,28 Lee Square,Redondo Beach
1,38,Medical Consultant,+1-201-354-9454,4508 6491 2587 5776,05/24,Porsha O'brien,1222 Escondido Expressway,Bellaire
2,60,Caretaker,+1-764-835-6634,5507 3917 1578 9947,02/23,Jessia Mclean,136 Storey Canyon,Santa Monica
3,81,Preacher,+13312648010,4763 3961 5554 5966,10/17,Noah Burgess,1334 Greely Rapids,Hudson
4,76,Airport Controller,+12819619334,4160 2801 0380 8172,10/25,Garfield Leonard,1266 Hale Cove,Morristown


In [16]:
df[df.age > 30].occupation.value_counts().nlargest(10).compute()

occupation
Mineralologist              16
Stud Hand                   16
Potter                      16
Dealer                      15
Press Operator              15
Marine Consultant           15
Valve Technician            15
Masseur                     15
Administration Assistant    14
Loans Manager               14
Name: count, dtype: int64