# Multithreading

In [1]:
import itertools
import time
from threading import Thread, Event

def spin(msg: str, done: Event) -> None:
    for char in itertools.cycle(r'\|/-'):
        status = f'\r{char} {msg}'
        print(status, end='', flush=True)
        if done.wait(.1):
            break
    blanks = ' ' * len(status)
    print(f'\r{blanks}\r', end='')

def slow() -> int:
    time.sleep(3)
    return 42

def supervisor() -> int:
    done = Event()
    spinner = Thread(target=spin, args=('thinking!', done))
    print(f'spinner object: {spinner}')
    spinner.start()
    result = slow()
    done.set()
    spinner.join()
    return result

def main() -> None:
    result = supervisor()
    print(f'Answer: {result}')

main()

spinner object: <Thread(Thread-6, initial)>
Answer: 42  


## Multiprocess

In [2]:
from multiprocessing import Process, Event
from multiprocessing import synchronize

def supervisor_vp() -> int:
    done = Event()
    spinner = Process(target=spin, args=('thinking!', done))
    print(f'spinner object: {spinner}')
    spinner.start()
    result = slow()
    done.set()
    spinner.join()
    return result

def main_vp():
    result = supervisor_vp()
    print(f'Answer: {result}')
    
main_vp()

spinner object: <Process name='Process-1' parent=12372 initial>
Answer: 42


## Asyncio
Only Code Jupyter already runs an event loop. See 'asyncio_examples.py'.

In [3]:
import asyncio
import itertools

async def spin_va(msg: str) -> None:
    for char in itertools.cycle(r'\|/-'):
        status = f'\r{char} {msg}'
        print(status, flush=True, end='')
        try:
            await asyncio.sleep(.1)
        except asyncio.CancelledError:
            break
    blanks = ' ' * len(status)
    print(f'\r{blanks}\r', end='')

async def slow_va() -> int:
    await asyncio.sleep(3)
    return 42

async def supervisor_va() -> int:
    spinner = asyncio.create_task(spin_va('thinking!'))
    print(f'spinner object: {spinner}')
    result = await slow_va()
    spinner.cancel()
    return result

def main_va() -> None:
    result = asyncio.run(supervisor_va())
    print(f'Answer: {result}')

### cpu intensive funciton

In [4]:
import math

def is_prime(n: int = 5000111000222021) -> bool:
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    root = math.isqrt(n)
    for i in range(3, root + 1, 2):
        if n % i == 0:
            return False
    return True

In [5]:
slow = is_prime
main()

spinner object: <Thread(Thread-7, initial)>
Answer: True


In [6]:
main_vp()

spinner object: <Process name='Process-2' parent=12372 initial>
Answer: True


because is_prime will be given control and it only yields control after it is finished -> supervisor is finished aswell -> spinner will never spin :(

# A Homegrown Process Pool
In this example we will write a programm to calculate prime numbers. First the basic sequential solution:

In [7]:
NUMBERS = (
    2,
    3333333333333333,
    4444444444444444,
    5555555555555555,
    6666666666666666,
    142702110479723,
    7777777777777777,
    299593572317531,
    9999999999999999,
    3333333333333301,
    3333335652092209,
    4444444488888889,
    4444444444444423,
    5555553133149889,
    5555555555555503,
    6666666666666719,
    6666667141414921,
    7777777536340681,
    7777777777777753,
    9999999999999917
          )

In [8]:
from time import perf_counter
from typing import NamedTuple

class Result(NamedTuple):
    prime: bool
    elapsed: float

def check(n: int) -> Result:
    t0 = perf_counter()
    prime = is_prime(n)
    return Result(prime, perf_counter() - t0)

def main() -> None:
    print(f'Checking {len(NUMBERS)} numbers sequentially:')
    t0 = perf_counter()
    for n in NUMBERS:
        prime, elapsed = check(n)
        label = 'P' if prime else ' '
        print(f'{n:16}  {label} {elapsed:9.6f}s')

    elapsed = perf_counter() - t0
    print(f'Total time: {elapsed:.2f}s')

main()

Checking 20 numbers sequentially:
               2  P  0.000001s
3333333333333333     0.000003s
4444444444444444     0.000000s
5555555555555555     0.000003s
6666666666666666     0.000001s
 142702110479723  P  0.400163s
7777777777777777     0.000005s
 299593572317531  P  0.574872s
9999999999999999     0.000029s
3333333333333301  P  1.901568s
3333335652092209     1.873971s
4444444488888889     2.175465s
4444444444444423  P  2.158195s
5555553133149889     2.581901s
5555555555555503  P  2.456280s
6666666666666719  P  2.625210s
6666667141414921     2.651568s
7777777536340681     2.842613s
7777777777777753  P  2.840014s
9999999999999917  P  3.283686s
Total time: 28.37s


### multiprocessing solution
doesnt work in jupyter see 'multiprocessing_example.py'

In [9]:
import sys
from time import perf_counter
from typing import NamedTuple
from multiprocessing import Process, SimpleQueue, cpu_count
from multiprocessing import queues

class PrimeResult(NamedTuple):
    n: int
    prime: bool
    elapsed: float

JobQueue = queues.SimpleQueue[int]
ResultQueue = queues.SimpleQueue[PrimeResult]

def check(n: int) -> PrimeResult:
    t0 = perf_counter()
    res = is_prime(n)
    return PrimeResult(n, res, perf_counter() - t0)

def worker(jobs: JobQueue, results: ResultQueue) -> None:
    while n := jobs.get():
        results.put(check(n))
        
def main(n_worker: int = 0) -> None:
    if n_worker:
        workers = n_worker
    else:
        workers = cpu_count()

    print(f'Checking {len(NUMBERS)} numbers with {workers} processes:')

    jobs: JobQueue = SimpleQueue()
    results: ResultQueue = SimpleQueue()
    t0 = perf_counter()

    for n in NUMBERS:
        jobs.put(n)
    

    for _ in range(workers):
        proc = Process(target=worker, args=(jobs, results))
        proc.start()
        jobs.put(0)
 

    while True:
        n, prime, elapsed = results.get()
        label = 'P' if prime else ' '
        print(f'{n:16}  {label} {elapsed:9.6f}s')
        if jobs.empty():
            break

    elapsed = perf_counter() - t0
    print(f'Total time: {elapsed:.2f}s')

### multithreaded Version
would be slower than even sequential because overhead is added but no speed advantage is gained.