In [1]:
# Add all necessary imports here
import matplotlib.pyplot as plt
%matplotlib inline
plt.style.reload_library()
plt.style.use("ggplot")

# Python GIL
 
* Global Interpreter Lock
* default Python is designed with simplicity in mind, so they made it thread-safe (GIL)
* Restrict python to run in a single thread
* __exectues only one statement at a time (serial processing or single-threading)__
* Cannot make use of data stored in shared memory



![pGIL](img/pGIL.png)

## Python GIL problem

_**Factorial example using Threading**_

``` python
from datetime import datetime
import threading 

def factorial(number): 
    fact = 1
    for n in range(1, number+1): 
        fact *= n 
    return fact 

number = 100000 
thread = threading.Thread(target=factorial, args=(number,)) 
startTime = datetime.now() 
thread.start() 
thread.join() 
endTime = datetime.now() 
print "Time for execution: ", endTime - startTime
```


run time:
    * 1 Thread  : 3.4 sec
    * 2 Threads : 6.2 sec

- You don’t get the concurrency needed with Python multithreading because of the Global interpreter lock

# multi-threading vs. multi-processing

### multi-threading
* jobs pictured as "sub-tasks" of a single process 
* have access to the same memory (shared memory)
* can lead conflicts (improper synchronization) 
    * _writing to same memory location at the same time_

### multi-processing
* safer approach (although has communication overhead)
* each process is completed independently from each other


# Parallel python on HPC

1. Processors
2. Nodes
3. Internodes
![HPC](img/HPC.PNG)

**Parallel8**
```
#PBS -q parallel8
#PBS -l select=1:ncpus=8:mpiprocs=8:mem=10GB
```
**Parallel12**
```
#PBS -q parallel12
#PBS -l select=1:ncpus=12:mpiprocs=12:mem=10GB
```
**Parallel24**
```
#PBS -q parallel24
#PBS -l select=1:ncpus=24:mpiprocs=24:mem=10GB
```

## Map function
Used to run a function over multiple elements
``` python
def square(a):
    return a*a
outputs =[]
for i in inputs:
    outpus.append(square(i))
# or
outputs = [square(i) for i in inputs]
#or
outputs = map(f, inputs)
```
![mapfn](img/map_fn.PNG)


# Parallel frameworks 


- [multiprocessing](https://docs.python.org/2/library/multiprocessing.html)

- [__*concurrent.futures*__](https://docs.python.org/3/library/concurrent.futures.html)

- [__*joblib*__](https://pythonhosted.org/joblib/)

- [ipyparallel](https://ipyparallel.readthedocs.io/en/latest/)

- [MPI4py](http://mpi4py.readthedocs.io/en/stable/)

- [__*Dask*__](https://dask.pydata.org/en/latest/)

# futures (concurrent.futures)
* part of standard library (python 3.2)
* abstract layer on top of Python's threading and multiprocessing modules

* **`executor`**
    * abstract class (can not be used directly)
    * *`ThreadPoolExecutor`* :- multithreading
    * *`ProcessPoolExecutor`* :- multiprocessing
    * submit multiple tasks to `Pool`
    * `Pool` assign tasks and schedule them to run

# futures: sum of all primes below *n*

``` python
import concurrent.futures
import time

def is_prime(num):
    if num <= 1:
        return False
    elif num <= 3:
        return True
    elif num%2 == 0 or num%3 == 0:
        return False
    i = 5
    while i*i <= num:
        if num%i == 0 or num%(i+2) == 0:
            return False
        i += 6
    return True


def find_sum(num):
    sum_of_primes = 0
    ix = 2
    while ix <= num:
        if is_prime(ix):
            sum_of_primes += ix
        ix += 1
    return sum_of_primes
```


### multi threading

``` python
def sum_primes_thread(nums):
    with concurrent.futures.ThreadPoolExecutor(max_workers = 4) as executor:
        for number, sum_res in zip(nums, executor.map(find_sum, nums)):
            print("{} : Sum = {}".format(number, sum_res))
```
### multiprocessing

``` python
def sum_primes_process(nums):
    with concurrent.futures.ProcessPoolExecutor(max_workers = 4) as executor:
        for number, sum_res in zip(nums, executor.map(find_sum, nums)):
            print("{} : Sum = {}".format(number, sum_res))

if __name__ == '__main__':
    nums = [100000, 200000, 300000]
    start = time.time()
    sum_primes_thread(nums)
    sum_primes_process(nums
    print("Time taken = {0:.5f}".format(time.time() - start))
```

Output when executing `sum_primes_process`

`
100000 : Sum = 454396537
200000 : Sum = 1709600813
300000 : Sum = 3709507114
Time Taken = 0.71783
`

Output when executing `sum_primes_thread`

`
100000 : Sum = 454396537
200000 : Sum = 1709600813
300000 : Sum = 3709507114
Time Taken = 1.2338
`

## as_completed & wait

#### as_completed()
* yeilds results as soon as futures start resolving
* vs `map()` : returns the results in order

#### wait()
* returns tuple with two sets
* one with completed and other conatins the uncompleted one's

## as_completed & wait

``` python
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint

def return_after_5_secs(num):
    sleep(randint(1, 5))
    return "Return of {}".format(num)

pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
    futures.append(pool.submit(return_after_5_secs, x))
```



*`as_completed`*

``` python
for x in as_completed(futures):
    print(x.result())
```

_`wait`_
``` python
print(wait(futures))
```
* `wait` controls : `return_when` : `FIRST_COMPLETED, FIRST_EXCEPTION`, `ALL_COMPLETED`

# joblib
* another parallel processing library
* developed by authors who work on *scikit-learn*
* also built on top of multiprocessing, multithreading
* ability to use a pool of worker like a context manager, reused across several tasks to be parallized
* if `njobs` set to 1, then it is puerly sequential mode, no overhead of setting up a pool

In [None]:
from joblib import Parallel, delayed
from math import sqrt
Parallel(n_jobs=1)(delayed(sqrt) (i**2) for i in range(10))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

# MPI4py

* python binding for MPI (Message Passing Interface)
* _distributed_ parallel programming in python
* Based ob MPI-2 C++ bindings
* Almost all MPI calls supported
* API docs: http://pythonhosted.org/mpi4py/apiref/index.html

# Minimal mpi4py example

``` python
from mpi4py import MPI
com = MPI.COMM_WORLD
print("%d of %d" %(comm.Get_rank(), comm.Get_size()))
```

Use **mpirun** and **python** to execute this script

`$ mpirun -n 4 python script.py`


Notes:
* MPI_Init is called when mpi4py is imported
* MPI_Finalize is called when the scipt exits


## P2P communication

**send() and recv()**


* one to one - one node to another.
* one to many - one node to all nodes or many of them.
* many to one - many nodes, or all nodes, to one node (usually the master).






``` python
from mpi4py import MPI
import time

comm = MPI.COMM_WORLD

rank = comm.rank
size = comm.size
name = MPI.Get_processor_name()

shared = (rank+1)*5

if rank == 0:
    data = shared
    comm.send(data, dest=1)
    comm.send(data, dest=2)
    print 'From rank',name,'we sent',data
    time.sleep(5)

elif rank == 1:
    data = comm.recv(source=0)
    print 'on node',name, 'we received:',data


elif rank == 2:
    data = comm.recv(source=0)
    print 'on node',name, 'we received:',data
```

### Collective communication: Bcast

Broadcasting data to all the nodes

``` python
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.rank

if rank == 0:
    data = {'a':1,'b':2,'c':3}
else:
    data = None

data = comm.bcast(data, root=0)
print 'rank',rank,data
```
All the nodes have the same values for `data`

`$mpirun -np 5 python bcast.py
rank 0 {'a':1,'b':2,'c':3}
rank 4 {'a':1,'b':2,'c':3}
rank 1 {'a':1,'b':2,'c':3}
rank 3 {'a':1,'b':2,'c':3}
rank 2 {'a':1,'b':2,'c':3}
`

### Collective communication: Scatter

scatter the data elements around the processing nodes
``` python
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
   data = [(x+1)**x for x in range(size)]
   print 'we will be scattering:',data
else:
   data = None
   
data = comm.scatter(data, root=0)
print 'rank',rank,'has data:',data
```
Now we see elements of data is scattered among processors

`$mpirun -np 5 python scatter.py
we will be scattering: [1, 2, 9, 64, 625]
rank 0 has data: 1
rank 1 has data: 2
rank 3 has data: 9
rank 4 has data: 64
rank 5 has data: 625
`


*Note*: can only scatter as many elements as you have processors, An error is raised, if you attempt to scatter more elements than your processors

### Collectives comm: Gather

Opposite to Scatter, gathers all elements from the worker nodes on master node

``` python
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
   data = [(x+1)**x for x in range(size)]
   print 'we will be scattering:',data
else:
   data = None
   
data = comm.scatter(data, root=0)
data += 1
print 'rank',rank,'has data:',data

newData = comm.gather(data,root=0)

if rank == 0:
   print 'master:',newData
```


Output 

`
we will be scattering: [1, 2, 9, 64, 625]
rank 0 has data: 1
rank 2 has data: 9
rank 4 has data: 625
rank 3 has data: 64
rank 1 has data: 2
master collected: [2, 3, 10, 65, 626]
`

### MPI4py  communications

P2 comm:

* Send(data, dest, tag)
* Recv(data, source, tag)
* send/recv : general Python objects, **slow**
* Send/Recv : continuous arrays, **fast**

Collectives:

* Bcast (Broadcast)
* Scatter
* Gather 
* Reduction 

Tutorial:
http://mpi4py.readthedocs.io/en/stable/tutorial.html

MPI4py API reference
http://mpi4py.scipy.org/docs/apiref/frames.html






![dask](img/dask-logo.png)


* Provides advanced parallelism for analytics
* That leverages the excellent python ecosystem 
   - Pandas, Numpy, Scikit-learn
   - Multicore and distributed clusters
* Using blocked algorithms and task scheduling
* Written in pure python (pip install, conda install)

 

# Dask Architectue

- High Level collections
- Dask graphs
- Task schedulers


![dask-arch](img/dask-arch2.PNG)

# dask.array
![dask-arr](img/dask-array.PNG)

Out-of-core, parallel, n-dimensional array library     
* Compies the numpy interface
   - Arithmetic, scalar math: `+, \*, exp, log,..`
   - Axes reductions: `sum(), mean(), std(), sum(axis=0),..`
   - some linalg: `tensordot, qr, svd`
   - Slicing `x[:100, 500:100:-2]`
 
* New operations
  - Parallel algo (`approximate quantiles, topk`,..)
  - Integration with HDF5

# example

1. Construct a 20k x 20k array of normally distributed randome values broken up into 1000x1000 sized chunks
2. Take the mean along one axis
3. Take every 100th element

### NumPy

```python
import numpy as np

%%time
x = np.random.normal(10, 0.1, size=(20000,20000))
y = x.mean(axis=0)[::100]
y
```
```
CPU times: user 19.6 s, sys: 160 ms, total: 19.8 s
Wall time: 19.7 s
```
### Dask Array
```python
import dask.array as da

%%time
x = da.random.normal(10, 0.1, size=(20000, 20000), chunks=(1000, 1000))
y = x.mean(axis=0)[::100] 
y.compute() 
```
```
CPU times: user 29.4 s, sys: 1.07 s, total: 30.5 s
Wall time: 4.01 s
```

# dask.dataframe


* Out-of-core, blocked, parallel df
* copies the Pandas API

![dask-df](img/dask-df.PNG)
    

# dask.bag

* parallelizes across large collections of generic Python objects
* dealing semi-structured like JSON blobs or log files
* mimics iterators, Toolz and PySpark


` dask.bag = map, filter, toolz + multiprocessing`

# dask.bag creation



### from seq
```python
import dask.bag as db
b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
```

### from files
```python 
import os
b = db.from_filenames(os.path.join('data', 'accounts.*.json.gz'))
```

### from s3
```python
### Requires `boto` library
b = db.from_s3('s3://nyqpug/tips.csv')
```

# dask.bag storing

#### In Memory
` result = b.compute()`
#### To Textfiles
` b.to_textfiles('/path/to/data/*.json.gz')`
#### To DataFrames
``` 
df = b.to_dataframe()
df.compute()
```

# dask graph
* Dictionary of (`name: task`)
* Tasks are tuples of (`func, args..`) (lispy syntax)
* Args can be names, values or tasks

![dask-graph](img/dask-graph.PNG)

### graph example

```python
from dask.dot import dot_graph
def inc(i):
    return i + 1

def mul(c, d):
    return c*d


d= {'a': 1,
    'b': 2,
    'x': (inc, "a"),
    'y': (inc, "b"),
    'z': (mul, "x", "y")
    }
    
dot_graph(dsk)
```

![dask-graph2](img/dask-graph2.png)

# dask.shedulers


#### Single machine scheduler

* Local Threads

```python
import dask
dask.set_options(get=dask.threaded.get)
```



* Local Processes

  - executes with local `multiprocessing.Pool`
  - default scheduler for dask bag
  
```python
import dask.multiprocessing
dask.get_options(get=dask.multiprocessing.get)
```

# dask.schedulers - distributed

#### local
```python
from dask.distributed import Client, progress
client = Client(processes=False, threads_per_worker=4, n_workers=1)
```

#### distributed

* more sophisticated, but more features
* manual setup
  - `dask-sheduler` and `dask-worker`
* SSH
* Kubernetes
* HPC (SLURM, SGE, PBS, MPI)


## dask.distributed - HPC

![dask-mpi](img/dask-mpi.PNG)

# takeaways
* Python can still handle large datasets using blocked algo

* Dask collections form task graphs expressing these algprithms

* Dask schedulers execute these graphs in parallel

* Dask graphs can be directly created for custom pipelines

## parallelism norms 
 
 **multi-process**, not multi-thread
 
 **multi-node**, not multi-core
 
 **message-passing**, not shared memory
 



### **_Frameworks_**
 * futures/joblib
 * dask (data intensive tasks)
     - computer/multicore/node/cluster

# on HPC

Normal Python: Single processor 

parallel8/12/24 (Single Node)
* python cannot make use of thos 8/12/24 processors
* _mulitprocessing, futures, joblib_ 
* #PBS -l select=**1**:ncpus=24:mpiprocs=24:mem=160GB

parallel8/12/24 (Multi Node)
* distributed parallelism
* _MPI4py, Dask_
* #PBS -l select=**2**:ncpus=24:mpiprocs=24:mem=160GB

# new on the plate

* Bioinformatics services on HPC
  - nextflow pipelines (RNAseq/other NGS)
  - Database services


* cloud services
  - AWS
  - Any Deep learning/ Other GPU jobs
  

# References

* http://sebastianraschka.com/Articles/2014_multiprocessing.html
* http://masnun.com/2016/03/29/python-a-quick-introduction-to-the-concurrent-futures-module.html
* http://pydata.github.io

![thanks](img/thanks.PNG)