# Notes on Python [Concurrency](https://docs.python.org/3.7/library/ipc.html)

## threading

CPython interpreter uses GIL to ensure that only one thread is executing Python bytecode at a time

Python source code compiled to bytecode (.py -> .pyc). intermediate language run on Python virtual Python virtual machine that in turn executes machine code. (.py -> .pyc -> VM -> machine code) (bytecode in `dis`). 

module.py text -> tokenized -> ast -> bytecode -> VM -> machine code

loosely based on Java threading. no priorities, thread groups, cannot generally control lifecycle of threads.

executed atomically - meaning one at a time, uninterrupted, entire method executes or does not? Think this is merely referring to single thread execution at a time, but could clarify later.

`Thread`

In [1]:
from threading import Thread
import threading
import time

In [2]:
def f():
    thread = threading.current_thread()
    print(f"Thread ({thread.name}) ({thread.ident}) function execution begins", end=" ")
    for i in range(3):
        time.sleep(0.25)
        print(".", end=" ")
    time.sleep(0.25)
    print("And Done!")

In [3]:
f()

Thread (MainThread) (4583409152) function execution begins . . . And Done!


Same function, different thread name and id from main.

In [4]:
Thread(target=f).start()

Thread (Thread-5 (f)) (123145551855616) function execution begins . . . And Done!


Can also use with subclassing

In [5]:
class MyThread(Thread):
    def run(self):
        return f()

In [6]:
MyThread().start()

Thread (Thread-6) (123145551855616) function execution begins . . . And Done!


### Sync Primitives

all support context manager `with` blocks

use locks to solve race condition problems, control access to shared state, etc.

locks can introduce deadlocks, where one thread never releases a lock and another thread waits forever to acquire it

instead of blocking, acquire calls can immediately return boolean, or return boolean after a timeout duration

`Lock`

Not owned by any one thread

lowest level sync primitive in Python

locked and unlocked state -> `acquire()` and `release()`

same as mutex (mutually exclusive execution)

In [7]:
from threading import Lock
lock = Lock()
lock.locked()

False

In [8]:
from string import ascii_lowercase
def messy_printing(lock):
    t = threading.current_thread()
    color = '\033[94m' if t.name == "blue" else '\033[91m'
    def print_alphabet():
        for letter in ascii_lowercase:
            print(color + letter, end="")
            time.sleep(0.01)
    if lock:
        with lock:
            print_alphabet()
    else:
        print_alphabet()

In [9]:
t1 = Thread(target=messy_printing, args=(None,), name="blue")
t2 = Thread(target=messy_printing, args=(None,), name="red")
t1.start()
t2.start()

[94ma[91ma[94mb[91mb[91mc[94mc[91md[94md[91me[94me[91mf[94mf[91mg[94mg[91mh[94mh[91mi[94mi[91mj[94mj[91mk[94mk[91ml[94ml[91mm[94mm[91mn[94mn[94mo[91mo[94mp[91mp[94mq[91mq[94mr[91mr[94ms[91ms[94mt[91mt[91mu[94mu[91mv[94mv[91mw[94mw[91mx[94mx[91my[94my[91mz[94mz

In [10]:
t1 = Thread(target=messy_printing, args=(lock,), name="blue")
t2 = Thread(target=messy_printing, args=(lock,), name="red")
t1.start()
t2.start()

[94ma[94mb[94mc[94md[94me[94mf[94mg[94mh[94mi[94mj[94mk[94ml[94mm[94mn[94mo[94mp[94mq[94mr[94ms[94mt[94mu[94mv[94mw[94mx[94my[94mz[91ma[91mb[91mc[91md[91me[91mf[91mg[91mh[91mi[91mj[91mk[91ml[91mm[91mn[91mo[91mp[91mq[91mr[91ms[91mt[91mu[91mv[91mw[91mx[91my[91mz

`RLock`

same as `Lock`, but can be acquired by same thread multiple times, and must be released as many times as it is acquired in that thread before its stated is not locked and another thread can acquire it

`Event`

atomic boolean - `.set()`, `is_set()`, `.wait()`, `.clear()`

`Condition`

basically `Event` + `Lock` controlling access to some resource

`Semaphore`

extend `Lock` with internal counter to allow multiple threads to acquire lock (limit number of threads that can acquire lock). `Semaphore(1)` is equivalent to a `Lock`

In [11]:
from threading import Semaphore
from random import randint

In [12]:
def work(id_, cost, name, semaphore):
    color = f"\33[{30 + id_}m"
    def g():
        for _ in range(cost): 
            time.sleep(cost / 1000)
            print(color + name, end="")
    if semaphore:
        with semaphore:
            g()
    else:
        g()

In [13]:
def test_semaphore(count):
    semaphore = Semaphore(count) if count is not None else None
    n_tasks = 5
    for i, name in zip(range(n_tasks), ascii_lowercase):
        cost = randint(1, 20)
        Thread(target=work, args=(i, cost, name, semaphore)).start()

In [14]:
test_semaphore(None)

[32mc[30ma[32mc[34me[30ma[34me[30ma[33md[34me[30ma[34me[31mb[33md[34me[33md[31mb[33md[33md[31mb[33md[31mb[33md[33md[31mb[33md[33md[31mb[33md[31mb[31mb[31mb[31mb[31mb[31mb[31mb[31mb[31mb[31mb[31mb[31mb[31mb

In [15]:
test_semaphore(1)

[30ma[30ma[30ma[30ma[30ma[30ma[30ma[30ma[30ma[30ma[30ma[30ma[30ma[30ma[30ma[30ma[31mb[31mb[31mb[31mb[31mb[31mb[31mb[31mb[31mb[31mb[31mb[31mb[31mb[31mb[31mb[31mb[31mb[31mb[31mb[31mb[32mc[32mc[32mc[33md[33md[34me[34me[34me[34me[34me[34me[34me[34me[34me[34me[34me[34me[34me[34me[34me[34me[34me[34me[34me

In [16]:
test_semaphore(2)

[31mb[31mb[30ma[31mb[31mb[30ma[31mb[31mb[30ma[32mc[30ma[30ma[32mc[30ma[32mc[30ma[32mc[30ma[32mc[30ma[30ma[32mc[30ma[32mc[30ma[32mc[30ma[32mc[30ma[33md[32mc[33md[33md[33md[32mc[33md[33md[32mc[33md[34me[32mc[32mc[32mc[32mc[32mc

`Barrier`

In [17]:
from threading import Barrier
import sys

In [18]:
def fire():
    print("Fire!")
def shoot(row=0):
    barrier.wait()
    distance = 50
    for left_spaces, right_spaces in zip(range(distance), reversed(range(distance))):
        l, r = " " * left_spaces, " " * right_spaces
        print(f"{l}->{r}", end="\r")
        sys.stdout.flush()
        time.sleep(0.05)

In [19]:
barrier = Barrier(parties=3, action=fire)

Not quite the beautiful sync animation I was hoping for, but I'm not going to take the time to firgure out the printing magic to do so.

1 writer thread that writes (10, 72) matrix to screen as str at some high sub-second framerate
(10, 72) shared memory matrix or 10 of vectors - then sharing is only 1 thread writes and 1 reads...

In [20]:
for i in range(3):
    time.sleep(0.2)
    print(".", end="")
    Thread(target=shoot, args=(i,)).start()

...Fire!
                                                 ->                                                ->  ->                                                 ->                                                  ->                                                 ->                                                  ->                                                 ->                                                  ->                                                 ->                                                 ->                                                   ->                                                  ->                                                 ->                                                  ->                                                  ->                                                  ->                                                 ->                                                  ->                                                  ->        

`Timer`

In [21]:
from threading import Timer
Timer(3, f).start()
print("Waiting...")
time.sleep(1)
print("Still waiting...")

Waiting...
Still waiting...
Thread (Thread-25) (123145551855616) function execution begins . . . And Done!


In [22]:
def heartbeat(interval):
    heart_rate = 60
    heart_rate_interval = heart_rate / 60
    print("ba bump...", end=" ")
    time.sleep(heart_rate_interval)
    global sentinal
    if sentinal == 0:
        print("beeeeeeeeep.")
        return
    else:
        Timer(interval, heartbeat, (interval,)).start()
        sentinal -= 1

In [23]:
interval = 0.25
sentinal = 5
Timer(interval, heartbeat, (interval,)).start()

ba bump... ba bump... ba bump... ba bump... ba bump... ba bump... beeeeeeeeep.


## multiprocessing

spawn -   
fork -  
forkserver -  

global `set_start_method()`  
local `get_context()`

In [24]:
from multiprocessing import Pool

Come back and detail why this works..

In [25]:
%%file mp_f.py
import threading
import time
def mp_f(start_time):
    started = time.time()
    diff = started - start_time
    thread = threading.current_thread()
    print(f"{diff:0.03f}s Thread ({thread.name}) ({thread.ident}) function execution begins", end=" ")
    for i in range(3):
        time.sleep(0.5)
        print(".", end=" ")
    time.sleep(0.5)
    print("And Done!")

Overwriting mp_f.py


In [26]:
from mp_f import mp_f
n = 8
start_time = time.time()
print("0.000s - start")
with Pool(n) as p:
    print(f"{time.time() - start_time:0.03f}s - pool created")
    p.starmap(mp_f, [(start_time,)] * n)

0.000s - start
0.083s - pool created
0.103s Thread (MainThread) (4504147456) function execution begins . . . And Done!
0.111s Thread (MainThread) (4646553088) function execution begins . . . And Done!
0.115s Thread (MainThread) (4593587712) function execution begins . . . And Done!
0.122s Thread (MainThread) (4559005184) function execution begins . . . And Done!
0.128s Thread (MainThread) (4469634560) function execution begins . . . And Done!
0.129s Thread (MainThread) (4669572608) function execution begins . . . And Done!
0.137s Thread (MainThread) (4597757440) function execution begins . . . And Done!
0.140s Thread (MainThread) (4656297472) function execution begins . . . And Done!


### IPC

producer consumer paradigm  
listeners and clients
FIFO, LIFO, priority

`Queue`, `SimpleQueue`, `JoinableQueue` 

In [27]:
%load_ext autoreload
%autoreload 2

In [28]:
from multiprocessing import Queue
queue = Queue()

In [29]:
%%file producer.py
import time
from random import randint
def producer(queue):
    print("Producer producing!")
    while True:
        time.sleep(1)
        work = randint(0, 5)
        print(f"Putting {work=} in queue")
        queue.put(work)
        if randint(0, 5) == 4:
            print("Magic number 8, gotta go!")
            queue.put("stop")
            break

Overwriting producer.py


In [30]:
from producer import producer
producer(queue)

Producer producing!
Putting work=1 in queue
Magic number 8, gotta go!


In [31]:
%%file consumer.py
import time
from random import randint
def consumer(queue):
    print("Consumer consuming!")
    while True:
        work = queue.get()
        if work == "stop":
            print("All done")
            break
        print(f"Handling {work=}", end=" ")
        for _ in range(work):
            time.sleep(work / 100)
            print(".", end=" ")
        print()

Overwriting consumer.py


In [32]:
from consumer import consumer
consumer(queue)

Consumer consuming!
Handling work=1 . 
All done


In [33]:
t = time.perf_counter()
producer_thread = Thread(target=producer, args=(queue,))
consumer_thread = Thread(target=consumer, args=(queue,))
producer_thread.start()
consumer_thread.start()
print(f"Startup time: {time.perf_counter() - t:0.04f}")
producer_thread.join()
consumer_thread.join()

Producer producing!
Consumer consuming!
Startup time: 0.0011
Putting work=2 in queue
Handling work=2 . . 
Putting work=3 in queue
Handling work=3 . . . 
Putting work=4 in queue
Handling work=4 . . . . 
Putting work=4 in queue
Handling work=4 . . . . 
Putting work=5 in queue
Handling work=5 . . . . . 
Putting work=1 in queue
Handling work=1 . 
Putting work=4 in queue
Handling work=4 . . . . 
Putting work=2 in queue
Handling work=2 . . 
Putting work=0 in queue
Handling work=0 
Putting work=0 in queue
Handling work=0 
Putting work=2 in queue
Handling work=2 . . 
Putting work=2 in queue
Magic number 8, gotta go!
Handling work=2 . . 
All done


In [34]:
from multiprocessing import Process

Why is this printing all producer stdout then consumer? Just printing or something off with Processes?

In [36]:
t = time.perf_counter()
producer_thread = Process(target=producer, args=(queue,))
consumer_thread = Process(target=consumer, args=(queue,))
producer_thread.start()
consumer_thread.start()
print(f"Startup time: {time.perf_counter() - t:0.03f}")
producer_thread.join()
consumer_thread.join()

Startup time: 0.013
Producer producing!
Putting work=3 in queue
Putting work=1 in queue
Putting work=2 in queue
Putting work=2 in queue
Putting work=3 in queue
Putting work=0 in queue
Putting work=2 in queue
Putting work=3 in queue
Putting work=1 in queue
Putting work=4 in queue
Putting work=1 in queue
Putting work=5 in queue
Putting work=4 in queue
Putting work=1 in queue
Putting work=1 in queue
Putting work=5 in queue
Putting work=3 in queue
Putting work=3 in queue
Putting work=0 in queue
Putting work=2 in queue
Putting work=5 in queue
Putting work=2 in queue
Putting work=5 in queue
Putting work=3 in queue
Putting work=5 in queue
Putting work=1 in queue
Putting work=4 in queue
Magic number 8, gotta go!
Consumer consuming!
Handling work=3 . . . 
Handling work=1 . 
Handling work=2 . . 
Handling work=2 . . 
Handling work=3 . . . 
Handling work=0 
Handling work=2 . . 
Handling work=3 . . . 
Handling work=1 . 
Handling work=4 . . . . 
Handling work=1 . 
Handling work=5 . . . . . 
Handling

`Pipe`, `Connection`

pipe lower level than queue. Explicitly connect, transfer between two procs only.

Non-`multiprocessing` IPC

files, signals, sockets, shared memory

### Sync Primitives

equivalent primitives to the `threading` module above

### Shared State

In [37]:
from multiprocessing.shared_memory import SharedMemory
from multiprocessing.managers import SharedMemoryManager

In [38]:
import numpy as np

In [39]:
# display = np.random.randint(0, 256, size=(78, 78), dtype=np.uint8)
X = np.zeros((78, 78), dtype=np.uint8)

In [41]:
print(X)

[[0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 ...
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]]


In [42]:
%%file shm.py
from multiprocessing.shared_memory import SharedMemory
import numpy as np
def work_on_shm(shm_name, shape, dtype):
    print("hey")
    shm = SharedMemory(shm_name)
    arr = np.ndarray(shape, dtype, shm.buf)
    print(arr.shape)

Overwriting shm.py


In [43]:
from shm import work_on_shm

In [44]:
from concurrent.futures import ProcessPoolExecutor, as_completed

In [45]:
with ProcessPoolExecutor(8) as executor:
    with SharedMemoryManager() as manager:
        shm = manager.SharedMemory(X.nbytes)
        shared_arr = np.ndarray(X.shape, X.dtype, shm.buf)
        shared_arr[:] = X[:]
        futures = executor.submit(work_on_shm, *(shm.name, X.shape, X.dtype))
        futures.result()

hey
(78, 78)


Shared Memory `ctypes` - `Value`, `Array` (what is this backed by?)

`Manager` server proc

In [46]:
Namespace?

Object `Namespace` not found.


https://docs.python.org/3/library/multiprocessing.shared_memory.html

### Pool! Adv.

In [47]:
%%file fib.py
from functools import lru_cache

# @lru_cache(None)
def fib(n):
    return n if n < 2 else fib(n - 1) + fib(n - 2)

Overwriting fib.py


In [48]:
from fib import fib

In [49]:
%time fib(33)

960 ms ± 32 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [50]:
%%time
with Pool(8) as p:
    print(p.map(fib, [33] * 8))

[3524578, 3524578, 3524578, 3524578, 3524578, 3524578, 3524578, 3524578]
CPU times: user 14.1 ms, sys: 39.2 ms, total: 53.3 ms
Wall time: 1.32 s


In [51]:
%%time
with ProcessPoolExecutor(max_workers=8) as executor:
    futures = executor.map(fib, [33] * 8)
    print(list(futures))

[3524578, 3524578, 3524578, 3524578, 3524578, 3524578, 3524578, 3524578]
CPU times: user 21.5 ms, sys: 53.9 ms, total: 75.4 ms
Wall time: 1.41 s


Do cool things

`Proxy` ?

Notes on [Guidelines](https://docs.python.org/3.7/library/multiprocessing.html#programming-guidelines)

## concurrent

notes on differences vs. main modules and pools

Executors!

## subprocess

## sched

In [52]:
import sched

In [53]:
def g(name):
    print(name, end=" ->")
    f()

In [54]:
scheduler = sched.scheduler()
scheduler.enter(delay=5, priority=1, action=g, argument=('A',))
scheduler.enter(delay=2, priority=3, action=g, argument=('B',))
scheduler.enter(delay=1, priority=2, action=g, argument=('C',))
scheduler.enter(delay=2, priority=1, action=g, argument=('D',))
scheduler.run()

C ->Thread (MainThread) (4583409152) function execution begins . . . And Done!
B ->Thread (MainThread) (4583409152) function execution begins . . . And Done!
D ->Thread (MainThread) (4583409152) function execution begins . . . And Done!
A ->Thread (MainThread) (4583409152) function execution begins . . . And Done!


## queue

probably can cover all above. LIFO, FIFO, Priority for coverage.

# [Networking & IPC](https://docs.python.org/3.7/library/ipc.html)

## asyncio

## socket

## mmap