# Multithreading and Multiprocessing

In this module, you'll learn

- How much (or little) parallelism you can get with Python threads
- How to use synchronization primitives with threading
- How to use the multiprocessing module to better utilize multicore machines


# Threading: the Global Interpreter Lock

- Only one Python thread active at a time
- C libraries can release the GIL
  - I/O libraries, NumPy, etc.
- Python threads are real, preemptive OS threads


# Python Threads

`threading.Thread(target, args=(), kwargs=None)`
- target - Python function to call
- args, kwargs - arguments to function
- can also subclass & override run()

`target(*args, **kwargs)`


## Basic threading example

In [2]:
import threading

class MyThread(threading.Thread):
    
    def run(self):
        self.text = input('Enter something: ')
        
input('Press enter to begin: ')
thread = MyThread()
thread.start()

count, result = 1, 1

while thread.is_alive():
    result = count * count
    count += 1
        
print(f'calculated squares up to {count} * {count} = {result:,} while you typed {thread.text}')

Press enter to begin: 
Enter something: asdf
calculated squares up to 369188 * 369188 = 136,299,040,969 while you typed asdf


## Classless threading

In [3]:
def cabbage():
    global text
    text = input('Enter something: ')
    
input('Press enter to begin: ')
thread = threading.Thread(target=cabbage)
thread.start()

count, result = 1, 1

while thread.is_alive():
    result = count * count
    count += 1
    
print(f'calculated squares up to {count} * {count} = {result:,} while you typed {text}')    

Press enter to begin: 
Enter something: asfd
calculated squares up to 549958 * 549958 = 302,452,701,849 while you typed asfd


# Threading for performance (?!)

In [4]:
import time
import json
from urllib.request import urlopen
from urllib.parse import urlencode

APPID = '10d4440bbaa8581bb8da9bd1fbea5617'
UNITS = 'metric'

cities = [
    'Boulder', 'Atlanta', 'San Francisco',
    'Reno', 'Honolulu', 'Zurich', 'Dubai',
    'Dublin','Stuttgart', 'Rome', 'Singapore', 
    'Bangalore', 'Hyderabad', 'Hong Kong',
    'Durham', 'New Orleans', 'Portland',
]

def get_temp(city, temps, units=UNITS, appid=APPID):
    qs = urlencode({
        'q': city,
        'units': units,
        'appid': appid,
    })
    url = f'http://api.openweathermap.org/data/2.5/weather?{qs}'
    with urlopen(url) as resp:
        data = json.load(resp)
        temps[city] = data['main']['temp']
        
def run_serially():
    for city in cities:
        get_temp(city, temps)
        
def run_threaded():
    # Create the threads
    threads = [
        threading.Thread(target=get_temp, args=(c, temps), kwargs={'units': 'imperial'}) 
        for c in cities
    ]

    # start all threads
    for thread in threads:
        thread.start() # not run()

    # wait for all threads to complete
    for thread in threads:
        thread.join()

In [5]:
%%time
temps = {}

run_serially()

for k, v in sorted(temps.items()):
    print(f'it is {v:.0f}° in {k}')

it is 23° in Atlanta
it is 20° in Bangalore
it is 7° in Boulder
it is 31° in Dubai
it is 13° in Dublin
it is 27° in Durham
it is 25° in Hong Kong
it is 26° in Honolulu
it is 29° in Hyderabad
it is 23° in New Orleans
it is 8° in Portland
it is 11° in Reno
it is 22° in Rome
it is 13° in San Francisco
it is 27° in Singapore
it is 16° in Stuttgart
it is 18° in Zurich
CPU times: user 25.7 ms, sys: 11.1 ms, total: 36.8 ms
Wall time: 1.9 s


In [6]:
%%time
temps = {}

run_threaded()

for k, v in sorted(temps.items()):
    print(f'it is {v:.0f}° in {k}')

it is 73° in Atlanta
it is 68° in Bangalore
it is 45° in Boulder
it is 88° in Dubai
it is 56° in Dublin
it is 80° in Durham
it is 76° in Hong Kong
it is 78° in Honolulu
it is 84° in Hyderabad
it is 74° in New Orleans
it is 47° in Portland
it is 52° in Reno
it is 72° in Rome
it is 56° in San Francisco
it is 80° in Singapore
it is 62° in Stuttgart
it is 65° in Zurich
CPU times: user 41.6 ms, sys: 386 µs, total: 42 ms
Wall time: 675 ms


In [7]:
%time get_temp('Zurich', {})

CPU times: user 7.76 ms, sys: 279 µs, total: 8.04 ms
Wall time: 85.2 ms


In [8]:
len(cities)

17

In [9]:
17*85.2

1448.4

# Synchronization primitives

Like other threading libraries, Python has support for `Lock`s, `Event`s, and `Semaphore`s:

In [10]:
lock = threading.Lock()
lock.acquire()
# critical work here 
lock.release()

In [11]:
# Better
lock = threading.Lock()
with lock:
    print('Do things with lock locked, will be released after block')

Do things with lock locked, will be released after block


In [12]:
sem = threading.Semaphore(4)
sem.acquire()
# up to 4 threads could be running in here
sem.release()

In [13]:
# Better
sem = threading.Semaphore(4)
with sem:
    print('Running in up to 4 different threads')


Running in up to 4 different threads


### Synchronized queue class

If you structure your threads to send/receive data rather than just _share_ data, you can use a `queue.Queue`:

In [14]:
import queue
import threading 

def worker(q):
    while True:
        value = q.get()
        print('I got a {}'.format(value))
        if value is None:
            print('Got none, so exiting')
            break

q = queue.Queue()
thd = threading.Thread(target=worker, args=(q, ))
thd.start()        

In [15]:
q.put('Hello there')

I got a Hello there


In [16]:
q.put('General Kenobi')

I got a General Kenobi


In [17]:
thd.is_alive()

True

In [18]:
q.put(None)

I got a None
Got none, so exiting


In [19]:
thd.is_alive()

False

In [20]:
import logging.handlers

In [21]:
logging.handlers.QueueListener?

### Simple thread pool

(There is also a thread pool in `multiprocessing.pool.ThreadPool`)

In [22]:
class Pool:
    def __init__(self, count):
        self.count = count
        self.job = queue.Queue()
        self.result = queue.Queue()
        self.threads = [
            threading.Thread(target=self.worker)
            for i in range(count)
        ]
        for t in self.threads:
            # t.setDaemon(True)  # make the thread 'daemonic'
            t.start()
            
    def worker(self):
        while True:
            job = self.job.get()
            try:
                result = job()
            except Exception as err:
                self.result.put((False, err))
            else:
                self.result.put((True, result))
                

In [23]:
pool = Pool(4)
import time
import random

def job():
    print('Starting job', flush=True)
    time.sleep(3 + random.random())
    print('exiting job', flush=True)

for i in range(10):
    pool.job.put(job)
print('Jobs created!', flush=True)

Jobs created!Starting job
Starting job

Starting job
Starting job
exiting job
Starting job
exiting job
Starting job
exiting job
Starting job
exiting job
Starting job
exiting job
Starting job
exiting job
Starting job
exiting job
exiting job
exiting job
exiting job


# Multiprocessing with Python

In this module, you'll learn

- How to use the multiprocessing module 
- How to use multiprocessing's support for synchronization and communication

# Multiprocessing

- Based on Threading
- No GIL, no shared memory without extra work
- Uses multicore well
- Much more memory intensive than threads

# Simple Multiprocessing example

In [24]:
from multiprocessing import Process, cpu_count

cpu_count()

16

In [27]:
%%file data/multi-test.py
from multiprocessing import Process, cpu_count
import time
import os

class MuchCPU(Process):
    def run(self):
        print(os.getpid())
        for i in range(20_000_000):
            result = i * i

if __name__ == '__main__':
    print('Running...')
    procs = [MuchCPU() for f in range(cpu_count())]
    # procs = [MuchCPU(), MuchCPU()]
    t = time.time()
    for p in procs:
        p.start()
        # p.join()
    
    for p in procs:
        p.join()
    
    print('work took {} seconds'.format(time.time() - t))

Overwriting data/multi-test.py


In [28]:
!python data/multi-test.py

Running...
5167
5168
5169
5170
5171
5172
5173
5174
5175
5176
5177
5178
5179
5180
5181
5182
work took 27.10633397102356 seconds


In [29]:
%%file data/multi-test-2.py

from multiprocessing import Process, cpu_count
import time
import os

def target():
    print(os.getpid())
    for i in range(20_000_000):
        result = i * i

        
if __name__ == '__main__':
    print('Running...')
    procs = [Process(target=target) for f in range(cpu_count())]
    t = time.time()
    for p in procs:
        p.start()
        #p.join()

    for p in procs:
        p.join()

    print('work took {} seconds'.format(time.time() - t))

Overwriting data/multi-test-2.py


In [30]:
!python data/multi-test-2.py

Running...
5184
5185
5186
5187
5188
5189
5190
5191
5192
5193
5194
5195
5196
5197
5198
5199
work took 8.994603157043457 seconds


# Pools

In [31]:
from multiprocessing import Pool

In [32]:
import time
import random

def much_cpu(n):
    # time.sleep(random.random())
    
    result = 0
    for i in range(n):
        result += i * i    
    return result

In [33]:
args_list = [2000] * 14
args_list

[2000,
 2000,
 2000,
 2000,
 2000,
 2000,
 2000,
 2000,
 2000,
 2000,
 2000,
 2000,
 2000,
 2000]

multiprocessing.Pool
 
 - map(f, args) => list
 - imap(f, args) => iterator
 - imap_unordered(f, args) => iterator but unordered

In [34]:
%%file data/mp-pool.py
from multiprocessing import Pool
import time
import random

def much_cpu(n):
    # time.sleep(random.random())
    
    result = 0
    for i in range(n):
        result += i * i    
    return result

args_list = [2000] * 20

if __name__ == '__main__':
    with Pool() as p:
        print(p.map(much_cpu, args_list))
        for result in p.imap(much_cpu, args_list):
            print(result)
        print(sum(p.imap_unordered(much_cpu, args_list)))

Overwriting data/mp-pool.py


In [38]:
!python data/mp-pool.py

[2664667000, 2664667000, 2664667000, 2664667000, 2664667000, 2664667000, 2664667000, 2664667000, 2664667000, 2664667000, 2664667000, 2664667000, 2664667000, 2664667000, 2664667000, 2664667000, 2664667000, 2664667000, 2664667000, 2664667000]
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
53293340000


In [39]:
import multiprocessing
p = multiprocessing.Pool()

foo bar


In [40]:
ar = p.apply_async(print, args=('foo', 'bar'))

In [41]:
ar.get()

In [42]:
from multiprocessing.pool import ThreadPool

In [43]:
args_list = [2000] * 20

with ThreadPool(processes=16) as p:
    for result in p.imap_unordered(much_cpu, args_list):
        print(result)

2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000
2664667000


In [44]:
p.close()

# Multiprocess synchronization and communication

- Lock, Condition, Semaphore, Event
- Queue & Pipe
- Shared Memory

### Multiprocessing basic shared memory

In [None]:
%%time
import random
import multiprocessing

ROWS = 1_000_000
COLS = 16

X = [random.random() for i in range(ROWS * COLS)]
Y = [random.random() for i in range(ROWS * COLS)]

In [None]:
def mularray(X, Y):
    return [x * y for x, y in zip(X, Y)]

Run #1: single-process

In [None]:
%%time
Z = mularray(X, Y)

Run #2: multiprocess (pool)

In [None]:
offsets = [c * ROWS for c in range(COLS)]

offsets

In [None]:
%%file data/mp-pmul.py
import time
import random
import multiprocessing

ROWS = 1_000_000
COLS = 16

pc_begin = time.perf_counter()

X = [random.random() for i in range(ROWS * COLS)]
Y = [random.random() for i in range(ROWS * COLS)]
offsets = [c * ROWS for c in range(COLS)]

pc_alloc = time.perf_counter()

def mularray(X, Y):
    return [x * y for x, y in zip(X, Y)]

def pmul(offset):
    return mularray(
        X[offset:offset + ROWS], 
        Y[offset:offset + ROWS],
    )

if __name__ == '__main__':
    Z = []
    with multiprocessing.Pool() as pool:
        for Zpart in pool.imap(pmul, offsets):
            Z += Zpart
    pc_end = time.perf_counter()
    print(f'Allocation in {pc_alloc - pc_begin}s')
    print(f'Work done in {pc_end - pc_alloc}s')

In [None]:
!time python data/mp-pmul.py

Run #3: multiprocess (pool), shared memory

In [None]:
%%file data/mp-pmul-sm.py
import time
import random
import multiprocessing

ROWS = 1_000_000
COLS = 16

pc_begin = time.perf_counter()
X = [random.random() for i in range(ROWS * COLS)]
Y = [random.random() for i in range(ROWS * COLS)]
offsets = [c * ROWS for c in range(COLS)]

sX = multiprocessing.Array('f', X, lock=False)
sY = multiprocessing.Array('f', Y, lock=False)
sZ = multiprocessing.Array('f', ROWS * COLS, lock=False)
pc_alloc = time.perf_counter()

def pmul(offset):
    for i in range(offset, offset + ROWS):
        sZ[i] = sX[i] * sY[i]
      
if __name__ == '__main__':
    with multiprocessing.Pool() as pool:
        for x in pool.imap_unordered(pmul, offsets):
            pass
    pc_end = time.perf_counter()
    print(f'Allocation in {pc_alloc - pc_begin}s')
    print(f'Work done in {pc_end - pc_alloc}s')

In [None]:
!time python data/mp-pmul-sm.py

Run #4: used `shared_memory`

In [None]:
%%file data/mp-pmul-sm2.py
import time
import random
import multiprocessing
from multiprocessing import shared_memory
from multiprocessing.managers import SharedMemoryManager

pc_begin = time.perf_counter()

ROWS = 1_000_000
COLS = 16
offsets = [c * ROWS for c in range(COLS)]

def pmul(offset):
    for i in range(offset, offset + ROWS):
        Z[i] = X[i] * Y[i]

if __name__ == '__main__':
    with SharedMemoryManager() as smm:
        X = smm.ShareableList([random.random() for i in range(ROWS * COLS)])
        Y = smm.ShareableList([random.random() for i in range(ROWS * COLS)])
        Z = smm.ShareableList([0] * (ROWS * COLS))
        pc_alloc = time.perf_counter()

        with multiprocessing.Pool() as pool:
            for x in pool.imap_unordered(pmul, offsets):
                pass

        pc_end = time.perf_counter()
        print(f'Allocation in {pc_alloc - pc_begin}s')
        print(f'Work done in {pc_end - pc_alloc}s')

In [None]:
!time python data/mp-pmul-sm2.py

### Aside: if you _really_ want it fast, just use numpy (or better yet, numpy + multithreading)

In [None]:
!pip install numpy

In [None]:
import numpy as np

In [None]:
X = np.random.random(ROWS * COLS)
Y = np.random.random(ROWS * COLS)

In [None]:
%%time
Z0 = X * Y

In [None]:
%%time
from multiprocessing.pool import ThreadPool

offsets = [c * ROWS for c in range(COLS)]
Z1 = np.zeros_like(X)

def pmul_numpy(offset):
    Z1[offset:offset+ROWS] = X[offset:offset+ROWS] * Y[offset:offset+ROWS]

with ThreadPool(processes=16) as p:
    for result in p.imap_unordered(pmul_numpy, offsets):
        pass

In [None]:
(Z0 == Z1).all()

In [None]:
%%time
import time
from multiprocessing import Pool
from multiprocessing.managers import SharedMemoryManager

offsets = [c * ROWS for c in range(COLS)]

def pmul_numpy(offset):
    aZ[offset:offset+ROWS] = aX[offset:offset+ROWS] * aY[offset:offset+ROWS]

with SharedMemoryManager() as smm:
    pc_begin = time.perf_counter()
    sX = smm.SharedMemory(X.nbytes)
    sY = smm.SharedMemory(X.nbytes)
    sZ = smm.SharedMemory(X.nbytes)
    aX = np.ndarray(X.shape, dtype=X.dtype, buffer=sX.buf)
    aY = np.ndarray(X.shape, dtype=X.dtype, buffer=sY.buf)
    aZ = np.ndarray(X.shape, dtype=X.dtype, buffer=sZ.buf)
    aX[:] = X[:]
    aY[:] = Y[:]
    pc_alloc = time.perf_counter()

    with Pool(processes=16) as p:
        for result in p.imap_unordered(pmul_numpy, offsets):
            pass
    pc_end = time.perf_counter()

    print(f'Allocation in {pc_alloc - pc_begin}s')
    print(f'Work done in {pc_end - pc_alloc}s')
    print((aZ == Z0).all())

# Lab

Open [concurrency lab][multithreading-multiprocessing-lab]

[multithreading-multiprocessing-lab]: ./multithreading-multiprocessing-lab.ipynb