# Multithreading and Multiprocessing

In this module, you'll learn

- How much (or little) concurrency you can get with Python threads
- How to use synchronization primitives with threading
- How to use the multiprocessing module to better utilize multicore machines


# Threading: the Global Interpreter Lock

- Only one Python thread active at a time
- C libraries can release the GIL
  - I/O libraries, NumPy, etc.
- Python threads are real OS threads
- “Interesting” behavior on multicore systems

# Python Threads

`threading.Thread(target, args=(), kwargs=None)`
- target - Python function to call
- args, kwargs - arguments to function
- can also subclass & override run()


## Basic threading example

In [1]:
import threading

class MyThread(threading.Thread):
    
    def run(self):
        self.text = input('Enter something: ')
        
input('Press enter to begin: ')
thread = MyThread()
thread.start()

count, result = 1, 1

while thread.is_alive():
    result = count * count
    count += 1
    
print(f'calculated squares up to {count} * {count} = {result:,} while you typed {thread.text}')

Press enter to begin: 
Enter something: foo
calculated squares up to 594824 * 594824 = 353,814,401,329 while you typed foo


## Classless threading

In [2]:
def target():
    global text
    text = input('Enter something: ')
    
input('Press enter to begin: ')
thread = threading.Thread(target=target)
thread.start()

count, result = 1, 1

while thread.is_alive():
    result = count * count
    count += 1
    
print(f'calculated squares up to {count} * {count} = {result:,} while you typed {text}')    

Press enter to begin: 
Enter something: 
calculated squares up to 205433 * 205433 = 42,202,306,624 while you typed 


# Threading for performance (?!)

In [3]:
import time
import json
from urllib.request import urlopen
from urllib.parse import urlencode

APPID = '10d4440bbaa8581bb8da9bd1fbea5617'
UNITS = 'metric'

cities = [
    'Boulder', 'Atlanta', 'San Francisco',
    'Reno', 'Honolulu', 'Zurich', 'Dubai',
    'Dublin','Stuttgart', 'Rome', 'Singapore', 
    'Bangalore', 'Hyderabad',
]

def get_temp(city, temps, units=UNITS, appid=APPID):
    qs = urlencode({
        'q': city,
        'units': units,
        'appid': appid,
    })
    url = f'http://api.openweathermap.org/data/2.5/weather?{qs}'
    with urlopen(url) as resp:
        data = json.load(resp)
        temps[city] = data['main']['temp']
        
def run_serially():
    for city in cities:
        get_temp(city, temps)
        
def run_threaded():
    # Create the threads
    threads = [
        threading.Thread(target=get_temp, args=(c, temps)) 
        for c in cities
    ]

    # start all threads
    for thread in threads:
        thread.start() # not run()

    # wait for all threads to complete
    for thread in threads:
        thread.join()

In [4]:
%%time
temps = {}

run_serially()

for k, v in sorted(temps.items()):
    print(f'it is {v:.0f}°C in {k}')

it is 9°C in Atlanta
it is 33°C in Bangalore
it is -3°C in Boulder
it is 23°C in Dubai
it is 6°C in Dublin
it is 21°C in Honolulu
it is 34°C in Hyderabad
it is 0°C in Reno
it is 16°C in Rome
it is 8°C in San Francisco
it is 32°C in Singapore
it is 9°C in Stuttgart
it is 6°C in Zurich
CPU times: user 21.6 ms, sys: 8.92 ms, total: 30.6 ms
Wall time: 1.69 s


In [5]:
%%time
temps = {}

run_threaded()

for k, v in sorted(temps.items()):
    print(f'it is {v:.0f}°C in {k}')

it is 9°C in Atlanta
it is 33°C in Bangalore
it is -3°C in Boulder
it is 23°C in Dubai
it is 6°C in Dublin
it is 21°C in Honolulu
it is 34°C in Hyderabad
it is 0°C in Reno
it is 16°C in Rome
it is 8°C in San Francisco
it is 32°C in Singapore
it is 9°C in Stuttgart
it is 6°C in Zurich
CPU times: user 11.8 ms, sys: 7.25 ms, total: 19.1 ms
Wall time: 102 ms


# Synchronization primitives

Like other threading libraries, Python has support for `Lock`s, `Event`s, and `Semaphore`s:

In [6]:
lock = threading.Lock()
lock.acquire()
lock.release()

In [7]:
# Better
lock = threading.Lock()
with lock:
    print('Do things with lock locked, will be released after block')

Do things with lock locked, will be released after block


In [8]:
sem = threading.Semaphore(4)
sem.acquire()
sem.acquire()
sem.acquire()
sem.acquire()
# sem.acquire() would block
sem.release()

In [9]:
# Better
sem = threading.Semaphore(4)
with sem:
    with sem:
        with sem:
            with sem:
                print('Do things with semaphore acquired, will be released after block')

Do things with semaphore acquired, will be released after block


### Threadsafe queue class

If you structure your threads to send/receive data rather than just _share_ data, you can use a `queue.Queue`:

In [10]:
import queue
import threading 

def worker(q):
    while True:
        value = q.get()
        print('I got a {}'.format(value))
        if value is None:
            break

q = queue.Queue()
thd = threading.Thread(target=worker, args=(q, ))
thd.start()        

In [11]:
q.put('Hello there')

I got a Hello there


In [12]:
q.put('General Kenobi')

I got a General Kenobi


In [13]:
q.put(None)

I got a None


### Simple thread pool

In [20]:
class Pool():
    def __init__(self, count):
        self.count = count
        self.job = queue.Queue()
        self.result = queue.Queue()
        self.threads = [
            threading.Thread(target=self.worker)
            for i in range(count)
        ]
        for t in self.threads:
            t.setDaemon(True)
            t.start()
            
    def worker(self):
        while True:
            job = self.job.get()
            try:
                result = job()
            except Exception as err:
                self.result.put(err)
            else:
                self.result.put(result)
                

In [21]:
pool = Pool(4)
import time

def job():
    print('Starting job')
    time.sleep(1)
    print('exiting job')

for i in range(10):
    pool.job.put(job)

Starting job
Starting job
Starting jobStarting job

exiting jobexiting job
Starting job
exiting job
Starting job

Starting job
exiting job
Starting job
exiting jobexiting job
Starting job
exiting job
Starting job

exiting job
exiting jobexiting job



# Multiprocessing with Python

In this module, you'll learn

- How to use the multiprocessing module 
- How to use multiprocessing's support for synchronization and communication

# Multiprocessing

- Based on Threading
- No GIL, no shared memory without extra work
- Uses multicore well

# Simple Multiprocessing example

In [24]:
from multiprocessing import Process, cpu_count
import time
import os

class MuchCPU(Process):
    def run(self):
        print(os.getpid())
        print(__name__)
        for i in range(20_000_000):
            result = i * i

if __name__ == '__main__':
    print('Running...')
    procs = [MuchCPU() for f in range(int(cpu_count()/2))]
    # procs = [MuchCPU(), MuchCPU()]
    t = time.time()
    for p in procs:
        p.start()
#         p.join()
    
    for p in procs:
        p.join()
    
    print('work took {} seconds'.format(time.time() - t))

Running...
69644
__main__
69645
69646
__main__
__main__
69647
__main__
work took 1.3642287254333496 seconds


# Multiprocess synchronization and communication

- Lock, Condition, Semaphore, Event
- Queue & Pipe
- Shared Memory

### Multiprocessing basic shared memory

In [25]:
%%time
import random
import multiprocessing

ROWS = 100_000
COLS = 8

X = [random.random() for i in range(ROWS * COLS)]
Y = [random.random() for i in range(ROWS * COLS)]

CPU times: user 202 ms, sys: 18.8 ms, total: 221 ms
Wall time: 220 ms


In [26]:
def mularray(X, Y):
    return [x * y for x, y in zip(X, Y)]

Run #1: single-process

In [27]:
%%time
Z = mularray(X, Y)

CPU times: user 62 ms, sys: 13.3 ms, total: 75.3 ms
Wall time: 74.3 ms


In [28]:
Z[:10]

[0.038740960268935615,
 0.2978505778944264,
 0.012724141542220525,
 0.41284384286938475,
 0.5828760953526163,
 0.07589645731154566,
 0.369558748926234,
 0.015927502038287834,
 0.022490655256077862,
 0.0006157911262867638]

Run #2: multiprocess (pool)

In [29]:
%%time
offsets = [c * ROWS for c in range(COLS)]

def pmul(offset):
    return mularray(
        X[offset:offset + ROWS], 
        Y[offset:offset + ROWS],
    )

Z = []
with multiprocessing.Pool() as pool:
    for Zpart in pool.map(pmul, offsets):
        Z += Zpart
    

CPU times: user 64.2 ms, sys: 49.3 ms, total: 114 ms
Wall time: 161 ms


In [None]:
Z[:10]

Run #3: multiprocess (pool), shared memory

In [30]:
offsets

[0, 100000, 200000, 300000, 400000, 500000, 600000, 700000]

In [32]:
sX = multiprocessing.Array('f', X)
sY = multiprocessing.Array('f', Y)
sZ = multiprocessing.Array('f', ROWS * COLS)

In [33]:
%%time
def pmul(offset):
    for i in range(offset, offset + ROWS):
        sZ[i] = sX[i] * sY[i]
        
with multiprocessing.Pool() as pool:
    pool.map(pmul, offsets)

CPU times: user 18.8 ms, sys: 29.5 ms, total: 48.2 ms
Wall time: 6.17 s


### Aside: if you _really_ want it fast, just use numpy (or better yet, numpy + multiprocessing)

In [34]:
import numpy as np

In [35]:
X = np.random.random(80000)
Y = np.random.random(80000)

In [36]:
%%time
Z = X * Y

CPU times: user 9.63 ms, sys: 5.76 ms, total: 15.4 ms
Wall time: 14.3 ms


# Lab

Open [concurrency lab][multithreading-multiprocessing-lab]

[multithreading-multiprocessing-lab]: ./multithreading-multiprocessing-lab.ipynb