In [None]:
!pip install "dask[distributed]" --upgrade

# Simple Dask Example

In [None]:
import dask.bag as db

# First lets create some simple data, say the integers from 1 to 1000. We use the python command range to do this. Note that in practice our data will most likely come from data files.

A = range(1000)

# Now let us distribute this data across all our processes using the sc.parallelize function.

bA = db.from_sequence(A)

# Let us start with a simple task of computing the sum of the values in the array:

print(sum(A))

# To do this for our distributed bag, we call the reduce function with a lambda function that adds all values.
# The first argument takes all values in a partition
# The second argument takes all partition reduced values
print(bA.reduction(lambda a: sum(a), lambda a: sum(a)).compute())
# This is equivalent because sum is already a function and takes a sequence to produce a single numerical result
print(bA.reduction(sum, sum).compute())
# Dask will distribute the reduction across available processes.


499500
499500
499500


#Word Count

In [None]:
#Let’s try for a more complex example, word count and working with files.
#First use a shell command to download the text of Peter Pan from the Guttenberg project
!wget -q -O peterpan.txt https://www.gutenberg.org/files/16/16-0.txt

In [None]:
# load the file into a bag of lines
lines = db.read_text("peterpan.txt")
# canonical example of WordCount
# split each line into (word, 1) tuples
words = lines.map(lambda line: [(word.lower(), 1) for word in line.split(" ")]).flatten()
grouped_counts = words.foldby(key = lambda w: w[0],
                       binop = lambda total, w: total + w[1],
                       initial = 0) # group by the word and do a reduction at the same time

# count frequencies of words
print(grouped_counts.topk(10, key=1).compute())


# slightly more efficient using the built-in frequencies transformation
# split each line into word tuples
words = lines.map(lambda line: [word.lower() for word in line.split(" ")]).flatten()
# count frequencies of words
counts = words.frequencies(sort=True)
print(counts.topk(100, key=1).compute())

[('the', 2281), ('\n', 1813), ('and', 1322), ('to', 1161), ('he', 974), ('of', 897), ('a', 879), ('was', 853), ('in', 673), ('it', 555)]
[('the', 2281), ('\n', 1813), ('and', 1322), ('to', 1161), ('he', 974), ('of', 897), ('a', 879), ('was', 853), ('in', 673), ('it', 555), ('that', 543), ('she', 540), ('they', 528), ('had', 467), ('you', 431), ('but', 427), ('his', 423), ('for', 378), ('not', 372), ('with', 353), ('her', 342), ('is', 321), ('as', 316), ('at', 314), ('on', 308), ('i', 242), ('have', 239), ('peter', 232), ('be', 227), ('were', 224), ('all', 218), ('the\n', 214), ('said', 212), ('this', 205), ('so', 205), ('wendy', 198), ('would', 194), ('their', 193), ('are', 183), ('by', 180), ('him', 174), ('one', 162), ('when', 158), ('“i', 157), ('them', 155), ('', 152), ('there', 152), ('if', 151), ('we', 145), ('from', 139), ('no', 138), ('could', 131), ('or', 130), ('which', 123), ('been', 123), ('who', 121), ('what', 120), ('did', 112), ('out', 109), ('do', 107), ('said,', 105), 

# With subprocesses

In [None]:
from dask.distributed import Client
# this line starts a Dask client
# The Client registers itself as the default Dask scheduler, and so runs all
# dask collections like dask.array, dask.bag, dask.dataframe and dask.delayed
client = Client(n_workers=4, threads_per_worker=1)
print(client)
# see https://distributed.dask.org/en/latest/client.html for more details

# our word count will now "automatically" run on sub-processess in the cluster
# referenced by Client
lines = db.read_text("peterpan.txt")
words = lines.map(lambda line: [word.lower() for word in line.split(" ")]).flatten()
# count frequencies of words
counts = words.frequencies(sort=True)
print(counts.topk(10, key=1).compute())



Perhaps you already have a cluster running?
Hosting the HTTP server on port 35769 instead
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:40903
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:35769/status
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:35441'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:33415'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:36293'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:39831'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:44481', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:44481
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:44070
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:37979', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.schedul

<Client: 'tcp://127.0.0.1:40903' processes=4 threads=4, memory=12.67 GiB>
[('the', 2281), ('\n', 1813), ('and', 1322), ('to', 1161), ('he', 974), ('of', 897), ('a', 879), ('was', 853), ('in', 673), ('it', 555)]
