# Ipyarallel: Rev your CPUs!

### Starting engines
```
!ipcluster nbextension enable # doesn't seem to work anymore
!ipython profile create mycluster --parallel
!ipcluster start --n=4 --profile=myclster # --daemon=True doesn't seem to work, have to do it on Terminal
```

In [1]:
import os
import ipyparallel as ipp


# wait 10 seconds before running this cell after starting the cluster
client = ipp.Client()
print(client.ids)
ar = client[:].apply_async(os.getpid)
ar.get_dict()

[0, 1, 2, 3]


{0: 9856, 1: 6472, 2: 8320, 3: 3540}

### Direct View gets all the engines

In [2]:
direct_view = client[:]
print(direct_view.apply_sync(lambda : "Hello, World!"))
print(direct_view.apply_async(lambda : "Hello, World!"))
print(direct_view.apply_async(lambda : "Hello, World!").get_dict())
print(direct_view.apply_async(lambda : "Hello, World!").get())

['Hello, World!', 'Hello, World!', 'Hello, World!', 'Hello, World!']
<AsyncResult: <lambda>>
{0: 'Hello, World!', 1: 'Hello, World!', 2: 'Hello, World!', 3: 'Hello, World!'}
['Hello, World!', 'Hello, World!', 'Hello, World!', 'Hello, World!']


In [3]:
print(direct_view.map_sync(lambda x: x ** 2, xrange(10)))
async = direct_view.map(lambda x: x ** 2, xrange(10)) 

print(async.get_dict()) # doesn't get the entire result
print(async.get()) # gets all the results

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
{0: 0, 1: 1, 2: 4, 3: 9}
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


### Push or pull objects into engines

In [4]:
# another way to put objects into engines
direct_view.push(dict(a=1.03234, b=3453))

<AsyncResult: _push>

In [5]:
direct_view.pull('a').get() # direct_view['a']; client[:]['a']

[1.03234, 1.03234, 1.03234, 1.03234]

In [6]:
client[0]['a'] = client[1]['a'] # take something out of 1 engine to put into another

In [7]:
%%px
# set all engines
a = 5
a

[0;31mOut[0:1]: [0m5

[0;31mOut[1:1]: [0m5

[0;31mOut[2:1]: [0m5

[0;31mOut[3:1]: [0m5

In [8]:
client[0]['a'] = 3 # set individual engine
%px a

[0;31mOut[0:2]: [0m3

[0;31mOut[1:2]: [0m5

[0;31mOut[2:2]: [0m5

[0;31mOut[3:2]: [0m5

In [9]:
direct_view['b'] = 5 # I don't really know the difference between client and direct_view

In [10]:
print(client[::2].execute('c = a + b'))
print(client[::2].execute('c = a + b').get_dict())

<AsyncResult: execute>
{0: <ExecuteReply[2]: >, 2: <ExecuteReply[2]: >}


### Spread an iterable across engines and save it with a name

In [11]:
direct_view.scatter('some_variable', xrange(10)) # spread different numbers to each engine
print(direct_view.gather('some_variable')).get()
%px some_variable

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


[0;31mOut[0:3]: [0m[0, 1, 2]

[0;31mOut[1:3]: [0m[3, 4, 5]

[0;31mOut[2:3]: [0m[6, 7]

[0;31mOut[3:3]: [0m[8, 9]

In [12]:
%%time
%%px

# still blocking because it returns when the last one finishes
# cannot stop process once started
for element in some_variable: # check if some_variable is an element or list
    counter = 0 
    while counter < element * 1e5:
        counter += 1
    print(element)

[stdout:0] 
0
1
2
[stdout:1] 
3
4
5
[stdout:2] 
6
7
[stdout:3] 
8
9
Wall time: 481 ms


In [13]:
%px %whos
# variables in each engine remember objects between Jupyter Notebook resets. 
# Have to manually restart the ipcluster

[stdout:0] 
Variable        Type    Data/Info
---------------------------------
a               int     3
b               int     5
c               int     8
counter         int     200000
element         int     2
some_variable   list    n=3
[stdout:1] 
Variable        Type    Data/Info
---------------------------------
a               int     5
b               int     5
counter         int     500000
element         int     5
some_variable   list    n=3
[stdout:2] 
Variable        Type    Data/Info
---------------------------------
a               int     5
b               int     5
c               int     10
counter         int     700000
element         int     7
some_variable   list    n=2
[stdout:3] 
Variable        Type    Data/Info
---------------------------------
a               int     5
b               int     5
counter         int     900000
element         int     9
some_variable   list    n=2


### Asynchronous call

Async/sync are separate from direct_view/load_balanced_view  
sync: return everything when the last machine is done, so all the results come at the same time  
async: the process will be queued and being processed. You can call .get(wait_time) to wait up to wait_time to get results or TimeoutError, use .done() to determine if all engines are done

direct_view: manually control with engines are doing the work   load_balanced_view: intelligently let the engine figure out which engines to take the next piece of the computation

In [14]:
%%time
import numpy as np

def myfunction(element):
    counter = 0 
    while counter < element * 1e5:
        counter += 1
    print(element)

load_balanced_view = client.load_balanced_view()
async = load_balanced_view.map_async(myfunction, xrange(50))


print async.elapsed # time elapsed
print async.progress # how many tasks completed

# myfunction will not exist in each engine's namespace
# as it was only an argument passed into the map_async()

0.108
1
Wall time: 112 ms


In [15]:
print(async.done()) # is it done?

#async.get(timeout=10) # give it up to 10 seconds before it either returns the results or TimeoutError
async.wait_interactive()
print(async.get(-1)) # wait until calculations are done and then return the results

  50/50 tasks finished after    8 s
done
[None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None]


In [None]:
# Instead of putting functions in engine namespace, only put in main namespace

@load_balanced_view.parallel()
def silly_function(x):
    return 10.0 * x ** 4

# this is done in parallel
silly_function.map(range(32)).get()

[0.0,
 10.0,
 160.0,
 810.0,
 2560.0,
 6250.0,
 12960.0,
 24010.0,
 40960.0,
 65610.0,
 100000.0,
 146410.0,
 207360.0,
 285610.0,
 384160.0,
 506250.0,
 655360.0,
 835210.0,
 1049760.0,
 1303210.0,
 1600000.0,
 1944810.0,
 2342560.0,
 2798410.0,
 3317760.0,
 3906250.0,
 4569760.0,
 5314410.0,
 6146560.0,
 7072810.0,
 8100000.0,
 9235210.0]

In [None]:
%%time
from IPython.display import clear_output
from tqdm import tqdm

@load_balanced_view.parallel(ordered=False, chunksize=1) 
# ordered: return results in any order
# chunksize: how the results come out in
def delay(num1, num2):
    import time
    time.sleep(num1)
    return num1 + num2

# this is how you pass multiple arguments, must have same length
async = delay.map(xrange(10), [0.1] * 10) 
for engine_result in tqdm(async):
    clear_output()
    print engine_result

print(async.get())

1.1


 20%|█████████████▏                                                    | 2/10 [00:01<00:04,  1.88it/s]

### Clear namepace

In [None]:
%%px
del a, b, counter, element, some_variable
try:
    del c
except NameError:
    pass
%whos

## Stopping engines

`!ipcluster stop`

# Comparing Word Count with ipyparallel, MRJob, and single CPU

## Word Count Example with ipyparallel

In [None]:
%%time
# pass function into load_balanced view
from glob import glob
from collections import Counter

def word_counter(file_name):
    from collections import Counter # can pass in all imports
    counter = Counter()
    with open(file_name) as f:
        for line in f:
            counter.update(line.lower().split())
        return counter

num_pieces = 5
!split --number=l/{num_pieces} encyclopedia_britannica.txt temp_file.

async = load_balanced_view.map(word_counter, 
    glob("/Users/Eugene/Desktop/Repos/ipyparallel/temp_file*"))

global_counter1 = Counter() 
for engine_result in async:
    global_counter1.update(engine_result)
!rm temp_file*

In [None]:
%%time
# decorate with load_balanced_view
@load_balanced_view.parallel(ordered=False, chunksize=1)
def word_counter(file_name):
    from collections import Counter # can pass in all imports
    counter = Counter()
    with open(file_name) as f:
        for line in f:
            counter.update(line.lower().split())
        return counter

num_pieces = 5
!split --number=l/{num_pieces} encyclopedia_britannica.txt temp_file.

global_counter2 = Counter()
    
file_names = glob("/Users/Eugene/Desktop/Repos/ipyparallel/temp_file*")
async = word_counter.map(file_names) # need to write map

for engine_result in async:
    global_counter2.update(engine_result)
    
!rm temp_file*

In [None]:
global_counter1 == global_counter2

In [None]:
!ipcluster stop

# MRJob Version of Word Count

In [None]:
%%writefile mr_word_counter.py
from mrjob.job import MRJob


class MRWordFrequencyCount(MRJob):

    def mapper(self, _, line):
        for word in line.lower().split():
            yield (word, 1)

    def combiner(self, word, aggregated_counts):
        yield word, sum(aggregated_counts)

    def reducer(self, key, count):
        yield key, sum(count)


if __name__ == '__main__':
    MRWordFrequencyCount.run()

In [None]:
%%time
!python mr_word_counter.py < encyclopedia_britannica.txt > temp_encyclopedia_counter_results.txt

# sort by second key in reverse order
!cat temp_encyclopedia_counter_results.txt | sort --key 2nr -n | head -20

## Comparision of ipyparallel, MRJob, Manual Counter

In [None]:
# ipyparallel version
print(global_counter1.most_common()[:10])

In [None]:
# MRjob version
from collections import Counter

counter_mrjob = Counter()

with open('temp_encyclopedia_counter_results.txt') as f:
    for line in f:
        word, count = line.strip().split('\t')
        counter_mrjob[word.strip('"')] = int(count)

print(counter_mrjob.most_common()[:10])

!rm temp_encyclopedia_counter_results.txt

In [None]:
%%time
counter_manual = Counter()
with open('encyclopedia_britannica.txt') as f:
    for line in f:
        counter_manual.update(line.lower().split())

print(counter_manual.most_common()[:10])

In [None]:
print(global_counter1 == counter_manual) # perfect!
print(counter_manual - counter_mrjob).most_common()[:10] # close enough!

### Conclusion:
For this word count example, it appears that ipyparallel is faster than MRJob. However, both seem to lose out to manual word counter (single CPU process)...  
Perhaps, this file is not large enough to merit multiple processors.