In [None]:
!cat ../fib.py

In [None]:
from fib import fibonacci
fibonacci(32)

In [None]:
!fib 32

In [None]:
!fib -l 20

## Subprocess - a poor man's multiprocessing

The [subprocess](https://docs.python.org/3/library/subprocess.html#module-subprocess) module allows you to spawn new processes, connect to their input/output/error pipes, and obtain their return codes. Subprocess is a built in module,  you should not need to install it.  Simply ```import python```

**subprocess allows you to run shell commands from python**

In [None]:
import subprocess

subprocess.check_output(...) allows you to run a command. It returns the output as bytes.

In [None]:
subprocess.check_output(["ls", "/"])

### A few things to note:

+ In Python 3 b'...' means the return value is bytes. Python 2 assumed that the return value is a string. This leads to all kinds of problems if what you're working with are actually bytes.
+ The output contains a few special characters, specifically '\n'. '\n'  means "carriage return" and you can treat it like a normal character in Python.

Python makes it is easy to convrt bytes into a string (assuming the bytes are a string!). To do so we can add .decode(...) 

In [None]:
subprocess.check_output(["ls", "/"]).decode('utf-8')

We can split up the output 

In [None]:
subprocess.check_output(["ls", "/"]).decode('utf-8').split("\n")

With ```subprocess.check_output(...)``` we can call our ```fib``` executable directly from Python

In [None]:
subprocess.check_output(["fib", "20"]).decode('utf-8')

In [None]:
subprocess.check_output(["fib", "-l", "20"]).decode('utf-8').split("\n")

subprocess.check_output(...) returns the output.   subprocess.check_call(...) returns the "return code."  This an operating system level 'code'  where 0 is (almost) always good,  and any other number means failure.

In [None]:
subprocess.check_call(["fib", "20"])

In [None]:
subprocess.check_call(["fib", "not", "a", "command"])

```subprocess.check_output(...)``` and ```subprocess.check_call(...)```
are both functions that wrap a lower level function called Popen().  Until now each of the functions in subprocess have pitched their computation to the operating system and waited.  Popen (the 'low-level' API) run's it's process in the background.  

In [None]:
p = subprocess.Popen(["fib", "-l", "32"], stdout=subprocess.PIPE)

The process is now running,  but the notebook can execute additional cells,  you are free to go on doing additional work.  You can tell when the process is finished by calling ```p.poll()``` if it returns None it is not finished,  if it returns 0 (the return code)  then it is complete.

In [None]:
p.poll() is not None

You can get access to the output of the ```stdout``` attribute of the process. But careful!  you can only get that data once! Make sure to put it in a variable if you want to hang onto it.

In [None]:
p.stdout.read()

In [None]:
p.stdout.read()

Using Popen() it is possible to run as many commands as you want

In [None]:
p1 = subprocess.Popen(["fib", "-l", "34"], stdout=subprocess.PIPE)
p2 = subprocess.Popen(["fib", "-l", "34"], stdout=subprocess.PIPE)
p3 = subprocess.Popen(["fib", "-l", "34"], stdout=subprocess.PIPE)

In [None]:
p1.poll() == 0 and p2.poll() == 0 and p3.poll() == 0

Just for fun,  it is possible to dynamically read the output of a command and print the value.  Unfortunately,  reading like this forces the cell to 'block.'  It is possible to not block, still get output and handle a several processes but that is outside the scope of this presentation.  If you're interested take a look at the built-in module [select](https://docs.python.org/3/library/select.html)  

In [None]:
p = subprocess.Popen(["fib", "-l", "34"], stdout=subprocess.PIPE)
for line in iter(p.stdout.readline, b''):
    print(line)

## Multiprocessing

Multiprocessing gives us many more options over ```subprocess.```  Multiprocessing works with functions,  rather than command line commands - this means we can write a function, pass it arguments, and smear it over a bunch of processors to get things done. 

In [None]:
import multiprocessing as mp
from fib import fibonacci

Pools are a much more convenient way to distribute a function across a number of arguments. 

In [None]:
pool = mp.Pool(3)

result = pool.apply_async(fibonacci, (32, ))

In [None]:
result

In [None]:
result.get()

In [None]:
pool = mp.Pool(3)
jobs = []

for value in [32, 33, 34]:
    result = pool.apply_async(fibonacci, (value, ))
    jobs.append(result)

In [None]:
jobs

In [None]:
[j.get() for j in jobs]

The multiprocessing ```map(...)``` function is quite useful. Note that the map function blocks until all 

In [None]:
pool = mp.Pool(3)
values = pool.map(fibonacci, [34, 34, 34])

In [None]:
values

There are a few 'gotchas'  - in particular functions must be importable from the '\_\_main\_\_' method.  Pratically it means that while the following is possible from a Jupyter Notebook - **it will not work in an ipython interpreter** - it will however work from a script that you run from the command line. This is because the function ```double``` is defined inside the REPL and the proccesses do not have access to it!

In [None]:
def double(x):
    return x * x

pool = mp.Pool(5)

print(pool.map(double, [1, 2, 3]))

Multiprocessing provides many mechanisms for sychronising between processes including Queues, Pipes, Locks, Semaphores, Events and shared memory between processes via Values and Arrays. There are interestesting things that allow you to coordinate between processes, but they are also drastically increse the complexity of any program that uses them. Because of this I won't cover them in this presentation. For more information checkout the [multiprocessing](https://docs.python.org/3.6/library/multiprocessing.html) documentation.

Here is a quick example of using multiprocessing to run a subprocess that returns outputs.  Recall before that ```subprocess.check_output(...)``` 'blocks' until it returns a value (the output of the shell command). By wrapping the subprocess call in a call to ```multiprocessing.map(...)``` we launch each function in its own process, which calls ```subprocess.check_output(...)``` waiting for the output to return.  The entire cell blocks until each of the ```multiprocessing.map(...)``` functions returns. This should only take as long as the _longest_ running function call.

In [None]:
import multiprocessing as mp
import subprocess

def run_fib(N):
    return subprocess.check_output(["fib", str(N)]).decode('utf-8')

pool = mp.Pool(3)

pool.map(run_fib, [32, 33, 34])

# IPython Parallel

To get IPython Parallel up and running we have to start a cluster. The easiest way to do this is by running the following command:

```
ipcluster start -n 4
```

The '4' above indicates that we want to have 4 "engines"  or "workers" This allows us to run the cluster locally, using as many processors as we have available.


In [None]:
import ipyparallel as ipp
from fib import fibonacci

Create a "Client"

In [None]:
c = ipp.Client()

In [None]:
c.ids

Create a Direct View 

In [None]:
dview = c[:]

Parallel Map

explain ```map_sync()```

In [None]:
serial = list(map(fibonacci, range(25)))

In [None]:
parallel = dview.map_sync(fibonacci, range(25))

In [None]:
serial == parallel

### Timing map_sync
Timing a run of 4 fibonacci numbers,  serial vs. parallel

In [None]:
N = 34

In [None]:
%%timeit -n1 -r1
list(map(fibonacci, [N, N, N, N]))

In [None]:
%%timeit -n1 -r1
dview.map_sync(fibonacci, [N, N, N, N])

### Remote function decorators

In [None]:
@dview.remote(block=True)
def remote_fib(N):
    from fib import fibonacci
    return fibonacci(N)

Not what you might expect!

In [None]:
remote_fib(32)

Didn't run ```remote_fib(32)``` on a single worker,  it ran the **same** function on each worker!

### Push/Pull

In [None]:
dview['some_var'] = 32

In [None]:
@dview.remote(block=True)
def return_some_var():
    global some_var
    return some_var
    
return_some_var()

In [None]:
@dview.remote(block=True)
def change_some_var():
    global some_var
    import random
    some_var = some_var + random.randint(1, 5)
    
change_some_var()

In [None]:
dview['some_var']

### Syncing imports

importing nessisary libraries can be quite annoying if you're defining many functions.  It is possible to sync imports using a context manager.

In [None]:
with dview.sync_imports():
    from fib import fibonacci
    import random

In [None]:
@dview.remote(block=True)
def remote_fib_better():
    global seed_var
    N = seed_var + random.randint(1, 4)

    return (N, fibonacci(N))


In [None]:
dview['seed_var'] = 25

remote_fib_better()

### Scatter & Gather

In [None]:
numbers = list(range(32))
random.shuffle(numbers)

In [None]:
numbers

In [None]:
dview.scatter('Ns', numbers)
dview['Ns']

In [None]:
@dview.remote(block=True)
def scattered_fib():
    global Ns
    Ns = [fibonacci(n) for n in Ns]
    
scattered_fib()

In [None]:
dview.gather('Ns').get()

### Magic Commands

You must instantiate a ```Client()``` before these commands will work!

In [None]:
%px print("Hello there!")

In [None]:
with dview.sync_imports():
    import os

In [None]:
%px print("Hello from {}".format(os.getpid()))

In [None]:
dview.scatter('Ns', numbers)
dview['Ns']

In [None]:
%%px
from fib import fibonacci
outputs = []
for i in Ns:
    outputs.append(fibonacci(i))

In [None]:
dview.gather('outputs', block=True)

### Running IPython Parallel across multiple computers

Running a ipyparallel cluster across multiple computers is a little more complicated.  First, on the same machine you are running the notebook,  you must run:

```
ipcluster --ip='<machine-ip-here>'
```

This will create a file in ```~/.ipython/profile_default/security/ipcontroller-engine.json```

In [None]:
!cat ~/.ipython/profile_default/security/ipcontroller-engine.json 

This needs to be copied into a file on each remote machine at the same location (``` ~/.ipython/profile_default/security/ipcontroller-engine.json```). Then,  on the machine you must run:

```
ipengine
```

This should give a little output that looks like the following:

```
2017-05-10 22:29:27.953 [IPEngineApp] Loading url_file '/home/ubuntu/.ipython/profile_default/se
curity/ipcontroller-engine.json'
2017-05-10 22:29:27.960 [IPEngineApp] Registering with controller at tcp://172.30.0.39:48491
2017-05-10 22:29:28.015 [IPEngineApp] Starting to monitor the heartbeat signal from the hub ever
y 3010 ms.
2017-05-10 22:29:28.019 [IPEngineApp] Completed registration with id 2
```

The importaint part is "Completed registration with id ...".  Once complete You should be able to load a client,  and distribute functions to the remote machines just as before.

In [None]:
client = ipp.Client()

In [None]:
client.ids

In [None]:
dv = client[:]

In [None]:
dv.scatter('Ns', numbers)
dv['Ns']

In [None]:
%%px
from fib import fibonacci
outputs = []
for i in Ns:
    outputs.append(fibonacci(i))

In [None]:
dv.gather('outputs', block=True)

In [None]:
# Dask thing huh?
# executor = client.become_dask(ncores=1)

# Celery

In [1]:
from celery_example.tasks import fibonacci

In [7]:
a = fibonacci.delay(30)
a

<AsyncResult: 3afbe9f3-2d69-45ca-b110-d972076b7b75>

In [8]:
a.get()

832040

In [12]:
import random
tasks = [fibonacci.delay(30 + random.randint(1, 5)) for _ in range(100)]
tasks

[<AsyncResult: 56c5f471-cdca-42ff-a00e-c926cc85a614>,
 <AsyncResult: 49642e98-e6d5-460f-adb3-2a584a99533b>,
 <AsyncResult: 24ff74ca-35f4-4f0b-82e0-abecfe0b42da>,
 <AsyncResult: 142a425b-fd9c-4403-8b1c-5cf51aeceece>,
 <AsyncResult: a0248ccd-dfbc-4809-afa2-908f9b3e4e57>,
 <AsyncResult: c71cacac-8f9e-4de9-8fbc-3878a1ce3759>,
 <AsyncResult: 9f38292d-f830-45f3-ac55-b78d58d9eeea>,
 <AsyncResult: 6abc1924-d276-4519-9657-5cff7497f161>,
 <AsyncResult: 718ba744-f056-47a4-b60b-a6273f87f757>,
 <AsyncResult: e77184a4-aeea-46c1-a596-ababe2fc808e>,
 <AsyncResult: b184346a-0342-4dc2-9a36-3eb326060547>,
 <AsyncResult: 7c8436fd-e813-44e6-86d7-54b7d87e9cf6>,
 <AsyncResult: db5d77e7-b1bf-4684-9684-decaab27835c>,
 <AsyncResult: 0fa15b1a-5eb6-472c-b616-540cca0dbf2b>,
 <AsyncResult: 6e9b4255-9e35-4612-82bb-b24b3a657696>,
 <AsyncResult: 542145c5-2d76-4692-83bb-5df91d5f11b4>,
 <AsyncResult: 77dedb75-464f-4305-84fa-e7f3cb63203e>,
 <AsyncResult: e84ec08b-fde8-48d4-8926-49777996a95e>,
 <AsyncResult: 34b66c5b-1795

In [11]:
[t.revoke() for t in tasks]

[None, None, None, None, None, None, None, None, None, None]