
# Parallel python

We are going to learn about four different ways to parallelize problems in python: python threading module, Numba, Pytorch, and Dask. All four are useful but are generally aimed at attacking different types of parallel problems.

We are going to begin with some important concepts that all parallel approaches deal with/hide from you. These concepts are going to keep coming up in all the different parallel approaches we use in the class.

The first thing to understand is on modern computers hundreds of different tasks are running simultaneously. Most of these tasks spend 99%+ of their time sleeping. In any parallel approach the first thing you do is create a new thread.
In its simplist form we are creating a completely dumb thread that runs completely independently from the main thread in the program.  The first approach we are
going to cover creates a thread and tells it what to do by assigning it a function to
run.

In [1]:
import threading

def findPrimes(lower,upper):  #calculate prime numbers in a range
    for num in range(lower, upper + 1):
       # all prime numbers are greater than 1
        if num > 1:
            for i in range(2, num):
                if (num % i) == 0:
                   break
            else:
               print(num)

p=threading.Thread(target=findPrimes,args=(1000,2000)) #create a new thread object


/bin/sh: jt: command not found


At this stage I've just defined what the thread is supposed to I haven't told it to begin.  I use the start function to tell the thread to do some work.

In [2]:
p.start()

1009
1013
1019
1021
1031
1033
1039
1049
1051
1061
1063
1069
1087
1091
1093
1097
1103
1109
1117
1123
1129
1151
1153
1163
1171
1181
1187
1193
1201
1213
1217
1223
1229
1231
1237
1249
1259
1277
1279
1283
1289
1291
1297
1301
1303
1307
1319
1321
1327
1361
1367
1373
1381
1399
1409
1423
1427
1429
1433
1439
1447
1451
1453
1459
1471
1481
1483
1487
1489
1493
1499
1511
1523
1531
1543
1549
1553
1559
1567
1571
1579
1583
1597
1601
1607
1609
1613
1619
1621
1627
1637
1657
1663
1667
1669
1693
1697
1699
1709
1721
1723
1733
1741
1747
1753
1759
1777
1783
1787
1789
1801
1811
1823
1831
1847
1861
1867
1871
1873
1877
1879
1889
1901
1907
1913
1931
1933
1949
1951
1973
1979
1987
1993
1997
1999


It is important to note that we now have two independent paths in our program. At this stage our main thread is continuing down our python code but the thread we've name *p* is executing only the function *findPrimes*. To demonstrate this concept. Look at the code below. What do you think will be printed.

In [3]:
last=0
def lastPrime(lower,upper):  #calculate prime numbers in a range
    print(last)
    for num in range(lower, upper + 1):
       # all prime numbers are greater than 1
        if num > 1:
           for i in range(2, num):
                if (num % i) == 0:
                   break
        else:
            last=num
   
p2=threading.Thread(target=lastPrime,args=(1000,2000)) #create a new thread object
p2.start()


Exception in thread Thread-5:
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/anaconda3/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-3-7b924858c349>", line 3, in lastPrime
    print(last)
UnboundLocalError: local variable 'last' referenced before assignment



You should be seeing an error because when you create a thread it by default has no knowledge of any variables that existed in our main program. Lets make a slight modification to our function and try again. This time passing in last.

In [70]:
last=5
def lastPrime(lower,upper,last):  #calculate prime numbers in a range
    for num in range(lower, upper + 1):
       # all prime numbers are greater than 1
       if num > 1:
            for i in range(2, num):
                if (num % i) == 0:
                    break
            else:
                last=num
   
p2=threading.Thread(target=lastPrime,args=(1000,2000,last)) #create a new thread object
p2.start()

print("last in the main program",last)

last in the main program 5


There are two possible reasons that 5 gets printed here. The first is that the program lastPrime has not started so last not been updated. To test this we can introduce a now thread function, *join*. The join function creates a barrier in the main program. Telling it wait for the tread to finish.

In [71]:
last=5
def lastPrime(lower,upper,last):  #calculate prime numbers in a range
    for num in range(lower, upper + 1):
       # all prime numbers are greater than 1
       if num > 1:
            for i in range(2, num):
                if (num % i) == 0:
                    break
            else:
                last=num
   
p2=threading.Thread(target=lastPrime,args=(1000,2000,last)) #create a new thread object
p2.start()
p2.join()
print("last in the main program",last)

last in the main program 5


Seeing 5 again shows you that the value of last is copied rather than the memory location. All of the basic types int, float, str will have this same behavior. If you remember the python notebook we saw that with more complicated types like lists we were actually passing references not the actual values. Take a guess what the following cell will produce before executing it.

In [7]:
last=[5]
def lastPrime(lower,upper,last):  #calculate prime numbers in a range
    for num in range(lower, upper + 1):
       # all prime numbers are greater than 1
       if num > 1:
            for i in range(2, num):
                if (num % i) == 0:
                    break
            else:
                last[0]=num
   
p3=threading.Thread(target=lastPrime,args=(1000,2000,last)) #create a new thread object
p3.start()
print("last in the main program",last)

last in the main program [1553]


If you execute the previous multiple times you should get different results. I've removed the join statement we are seeing the value of last at the instant that the main thread gets to the statement. That instance will vary from run to run. If we put back our join statement we get the expected result.

In [98]:
last=[5]
def lastPrime(lower,upper,last):  #calculate prime numbers in a range
    for num in range(lower, upper + 1):
       # all prime numbers are greater than 1
       if num > 1:
            for i in range(2, num):
                if (num % i) == 0:
                    break
            else:
                last[0]=num
   
p3=threading.Thread(target=lastPrime,args=(1000,2000,last)) #create a new thread object
p3.start()
p3.join()
print("last in the main program",last)

last in the main program [1999]


Now lets introduce another thread and a new problem.


In [10]:
last=[0]
def incrementMe(last):
    for i in range(1000000):
        last[0]=last[0]+1

p4=threading.Thread(target=incrementMe,args=(last,)) #create a new thread object
p5=threading.Thread(target=incrementMe,args=(last,)) #create a new thread object
p4.start()
p5.start()
p4.join()
p5.join()
print(last)

[1289487]


What we are seeing is the result of a race condition.  Both threads are changing the same value. The value of *last*. One way to think about it is p4 requests *last* before it has a chance to change it and have its value sent to cache and cache coherence between the different cores has been achieved thread p5 has requested the same value. The mechanism to deal with this problem is called a lock.  A lock, locks all but a single thread from accessing the code within the *with* statement.  Run the following cell and you can see we now get the correct answer.

In [11]:
%%time
last=[0]
def incrementMe(last,lock):
    for i in range(1000000):
        with lock:
            last[0]=last[0]+1

lock=threading.Lock()
p4=threading.Thread(target=incrementMe,args=(last,lock)) #create a new thread object
p5=threading.Thread(target=incrementMe,args=(last,lock)) #create a new thread object
p4.start()
p5.start()
p4.join()
p5.join()
print(last)

[2000000]
CPU times: user 645 ms, sys: 23.4 ms, total: 668 ms
Wall time: 695 ms


Locks have  a significant downside. They are barrier statements. A thread is stopped (doing nothing) until it acquires a lock. In this trivial example we have basically made a parallel program serial plus the added cost of acquiring the lock. Play with the following cell adjusting the number of threads.

In [16]:
%%time
last=[0]
def incrementMe(last,lock,num):
    for i in range(num):
        with lock:
            last[0]=last[0]+1

lock=threading.Lock()
nthreads=100
countTo=int(1000*1000*10/nthreads)
ps=[]
for i in range(nthreads):
    ps.append(threading.Thread(target=incrementMe,args=(last,lock,countTo)))

for p in ps:
    p.start()

for p in ps:
    p.join()

print(last)


[10000000]
CPU times: user 47.6 s, sys: 2min 41s, total: 3min 28s
Wall time: 1min 35s


If we do loop on a single thread. The performance is orders of magnitude better.

In [116]:
%time
last=[0]
for i in range(10*1000*1000):
    last[0]+=1

CPU times: user 3 µs, sys: 1e+03 ns, total: 4 µs
Wall time: 6.91 µs


The lesson is when doing things in parallel you want to code in a pattern or style that avoids the needs for locks as much as possible.

## An example when threading is useful

Lets say you need to process a large number of files.  The processing of these files involves reading them, applying some filter to them, and writing them out.  You time the various parts of your code and figure out that the IO takes about the same amount of time as the actual processing.

In [132]:
class buffer:
    """A Simple class that stores my data"""
    def __init__(self,sz):
        self._buf=[]
        self._sz=sz;

        
        
def readFile(self,filename,buf):
    """I am going to read the file from source....normally"""
    #f.open(filename)
    pass;

def writeFile(self,filename,buf):
    """I am going to write the file to some destination....normally"""
    #f=open(filename,w+)
    pass;

def processData(bufIn,bufOut):
    """Process the data"""
    pass;        

class proccessALot:
    def __init__(self,fileList,nsz):
        self._fileList=fileList
        self._nsz=nsz

    def processList(fileList,nsz):
        """Read an process a bunch of files"""
        bufRead=buffer(nsz)
        bufWrite=buffer(nsz)  #buffer used for writing
        bufIn=buffer(nsz)   #buffer used for processing input
        bufOut=buffer(nsz)  #buffer used for processing output
       
        readThread=threading.Thread(target=readFile,args=(fileList[0],bufRead))
        readThread.start() #start the first thread
    
        for i in range(len(fileList)):
            readThread.join()
            bt=bufIn  #store temp location
            bufIn=bufRead # Remember this just makes a copy of the pointer to memory
            bufRead=bt #Finish pointer swap
            if i < len(fileList)-1:
                readThread=threading.Thread(target=readFile,args=(fileList[i+1],bufRead))
                readThread.start() 
            processData(bufIn,bufOut)
            if i>0:
                writeThread.join()
            bt=bufWrite
            bufWrite=bufOut
            bufOut=bt
            writeTread=threading.Thread(target=writeFile,args=(fileList[i],bufWrite))
        writeThread.join()
        
    
    
    

We now have a function that will overlap the processing with the reading and writing. We could expand. We could try making this even more parallel by having splitting up the list into smaller lists and either have them run on different machines or on the same machine. For example this is how we can expand on the capabilities of our processAlot function on a single machine.

In [136]:
import math
class processParallelLocal(proccessALot):
    def __init__(self,fileList,nsz,ninstances):
        super().__init__(fileList,nsz)
        self._ninstances=ninstances
    def DoWork(self):
        n=self._ninstances
        ibeg=0
        nd=len(fileList)
        threads=[]
        for i in range(self._ninstances):
            nb=math.ceil(nd/n)
            lst=fileList[ibeg:ibeg+nb]
            lout=lst[ibeg:ibeg+nb]
            n=n-1
            nd=nd-nb
            ibeg+=nb
            threads.append(threading.Thread(target=self.processList,args=(lout,self._nsz)))
        for t in threads:
            t.start()
        for t in threads:
            t.join()
            

We've now can process multiple files simultaneously on one machine.

## Calculating pi

There are several different ways to caculate pi. One of the more interesting is to use random numbers.  Imagine drawing $\frac{1}{4}$ of a circle centered at zero with a radius of 1 in a the plane bounded by the x and y axis and $x=1$ and $y=1$. The area of the $\frac{1}{4}$ of the circle is $\frac{\pi r^2}{4}= \frac{\pi}{4}$ with the area of the entire domain being is 1. If we select a random location in the plane we should have a $\frac{\pi}{4}$ chance of being inside the circle.  The more random guesses we test to see if we are inside the circle the better our estimate of $\pi$. The following function demonstrates the concept. Note that I am going to write the function using numba to improve speed.

In [18]:
import numba
import numpy as np
@numba.jit(nopython=True)
def calcPi(num):
    tot=0
    inside=0
    while tot < num:
        blk=int(min(num-tot,1000*1000))
        x=np.random.rand(blk)
        y=np.random.rand(blk)
        for i in range(blk):
            if x[i]*x[i]+y[i]*y[i] <=1.:
                inside+=1
        tot+=blk
    return inside/num*4
    

In [24]:
print(calcPi(10000000))

3.1414812


In order to parallelize this algorithm we are going to do a common parallelization technique we are going to increase the dimensionality of the output vector and handle it after a parallel region.

In [25]:
import threading
import numpy as np
@numba.jit(nopython=True)
def calcPi(num,res,ith):
    tot=0
    inside=0
    while tot < num:
        blk=int(min(num-tot,1000*1000))
        x=np.random.rand(blk)
        y=np.random.rand(blk)
        for i in range(blk):
            if x[i]*x[i]+y[i]*y[i] <=1.:
                inside+=1
        tot+=blk
    res[ith]=inside/num*4
nthreads=5
nattempts=1000*1000*1000
perThreadAttempts=int(nattempts/nthreads)
threads=[]
pie=np.zeros(nthreads)
for i in range(nthreads):
    threads.append(threading.Thread(target=calcPi,args=(perThreadAttempts,pie,i)))
for t in threads:
    t.start()
for t in threads:
    t.join()
tot=0
for p in pie:
    tot+=p/nthreads
print(tot)

3.141631704


### Parallelizing in Numba

In the above example what we really did is break up into nthread pieces, have different threads run different portions of the loop and then combine the result at the end. This is probably the most common parallelization technique used in numerical calculations and is supported by virtually all parallelization libraries in a simpler form.  In numba all we need to do is add the *parallel=True* to jit line.

In [27]:
%%time
@numba.jit(nopython=True,parallel=True)
def calcPi(num):
    tot=0
    inside=0
    blk=1000*1000
    while tot < num:
        blk=int(min(num-tot,blk))
        x=np.random.rand(blk)
        y=np.random.rand(blk)
        for i in range(blk):
            if x[i]*x[i]+y[i]*y[i] <=1.:
                inside+=1
        tot+=blk
    return inside/num*4
calcPi(1000*1000*1000)

CPU times: user 30 s, sys: 3.38 s, total: 33.4 s
Wall time: 10.3 s


3.141608788

What numba is actually doing is parallelizing the for loop.  It creates a number of parallel threads that are each assinged to different portions of the range of *blk*. It also notices that each thread needs its own copy of *inside* and that the results of each threads version of *inside* needs to be summed at the end of the loop.  Try adjusting *blk*.  What you should notice is if you make the size of *blk* to small the total compute time goes up because the cost of setting up and destroying the parallel region comes to dominate. 

## Terminology

The above examples introduce the need for some additional terminology when dealing with parallel loops. The important terms 

- private Variables that each thread has their own copy of
- shared  Variables that all threads share
- reduction Variables that combined in some manner at the end of a loop
- firstprivate, lastprivate - For private variables that also exist in the main what to set the main variables to when entering and existing a parallel region




In [1]:
import threading
import numpy as np
import numba
import math
@numba.njit()
def squareIt(ar,parallel=True):
    for i3 in numba.prange(ar.shape[0]):
        for i2 in range(ar.shape[1]):
            for i1 in range(ar.shape[2]):
                ar[i3,i2,i1]=ar[i3,i2,i1]*ar[i3,i2,i1]

In [2]:
x=np.random.rand(1000,10000,100)

    

In [74]:
%%time
for i in range(5):
    squareIt(x)

CPU times: user 29.6 s, sys: 11.5 s, total: 41.1 s
Wall time: 46 s


TypeError: write() argument must be str, not int

In [15]:
import threading
import numpy as np
import numba
import math
@numba.njit()
def mathOp(ar):
    for i3 in range(ar.shape[0]):
        for i2 in range(ar.shape[1]):
            for i1 in range(ar.shape[2]):
                ar[i3,i2,i1]=math.sin(math.cos(ar[i3,i2,i1])*math.sin(ar[i3,i2,i1]))

In [16]:
%%time
for i in range(5):
    mathOp(x)

CPU times: user 1min 50s, sys: 120 ms, total: 1min 50s
Wall time: 1min 50s


In [3]:
import threading
import numpy as np
import numba
import math
@numba.njit(parallel=True)
def mathOpP1(ar):
    for i3 in range(ar.shape[0]):
        for i2 in range(ar.shape[1]):
            for i1 in numba.prange(ar.shape[2]):
                ar[i3,i2,i1]=math.sin(math.cos(ar[i3,i2,i1])*math.sin(ar[i3,i2,i1]))

In [None]:
%%time
for i in range(5):
    mathOpP1(x)

In [4]:
import threading
import numpy as np
import numba
import math
@numba.njit(parallel=True)
def mathOpP1(ar):
    for i3 in numba.prange(ar.shape[0]):
        for i2 in range(ar.shape[1]):
            for i1 in range(ar.shape[2]):
                ar[i3,i2,i1]=math.sin(math.cos(ar[i3,i2,i1])*math.sin(ar[i3,i2,i1]))

In [5]:
%%time
for i in range(5):
    mathOpP1(x)

CPU times: user 6min 46s, sys: 472 ms, total: 6min 46s
Wall time: 15.5 s


In [6]:
import threading
import numpy as np
import numba
import math
@numba.njit(parallel=True)
def mathOpP2(ar):
    for i3 in range(ar.shape[0]):
        for i2 in numba.prange(ar.shape[1]):
            for i1 in range(ar.shape[2]):
                ar[i3,i2,i1]=math.sin(math.cos(ar[i3,i2,i1])*math.sin(ar[i3,i2,i1]))

In [8]:
%%time
for i in range(5):
    mathOpP2(x)

CPU times: user 5min 44s, sys: 4.01 s, total: 5min 48s
Wall time: 12.2 s


$\frac{\partial^2 u}{\partial t^2}=v^2\nabla^2 U$

u[t+1,ix,iy,iz] -2 u[t,ix,iy,iz] + u[t-1,ix,iy,iz] = v^2 ( 8 * u[t,ix,iy,iz]- u[t,ix-1,iy,iz]-u[t,ix+1,iy,iz] -u[t,ix,iy+1,iz] - u[t,ix,iy-1,iz] - u[t,ix,iy,iz+1] - u[t,ix,iy,iz-1])