## Parallelize via Async
Useful for IO bound operations like async API calls, async DB calls, etc

In [8]:
import asyncio
import time

async def parallize_via_asyncio_gather(func, params, semaphore=None):
  outputs = await asyncio.gather(*[func(x, semaphore) for x in params])
  return outputs

In [9]:
semaphore = asyncio.Semaphore(4)
params = [1,2,3,4,5,6,7,8,9,10]

async def io_bound_func(some_param, semaphore):
  """
  A slow function that does some processing on some parameters, for exp, an inference call
  Semaphore limits the number of concurrent requests that can exist like an inference call
  """
  async with semaphore:
    await asyncio.sleep(1) # time.sleep() is synchronous thus blocking, so we can't use that to mock this function!
    print(f"param: {some_param}")
    return some_param


In [10]:
import asyncio
asyncio.Semaphore.__aenter__??

[0;31mSignature:[0m [0masyncio[0m[0;34m.[0m[0mSemaphore[0m[0;34m.[0m[0m__aenter__[0m[0;34m([0m[0mself[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m <no docstring>
[0;31mSource:[0m   
    [0;32masync[0m [0;32mdef[0m [0m__aenter__[0m[0;34m([0m[0mself[0m[0;34m)[0m[0;34m:[0m[0;34m[0m
[0;34m[0m        [0;32mawait[0m [0mself[0m[0;34m.[0m[0macquire[0m[0;34m([0m[0;34m)[0m[0;34m[0m
[0;34m[0m        [0;31m# We have no use for the "as ..."  clause in the with[0m[0;34m[0m
[0;34m[0m        [0;31m# statement for locks.[0m[0;34m[0m
[0;34m[0m        [0;32mreturn[0m [0;32mNone[0m[0;34m[0m[0;34m[0m[0m
[0;31mFile:[0m      ~/miniconda3/lib/python3.12/asyncio/locks.py
[0;31mType:[0m      function

In [11]:
counter = 4

async def check_counter():
  global counter
  while counter == 0:
    continue
  counter -= 1
  return

async def io_bound_func_changed(some_param, semaphore=None):
  """
  A slow function that does some processing on some parameters, for exp, an inference call
  Semaphore limits the number of concurrent requests that can exist like an inference call
  """
  while True:
    global counter
    await check_counter()
    await asyncio.sleep(1) # time.sleep() is synchronous thus blocking, so we can't use that to mock this function!
    print(f"param: {some_param}")
    counter += 1
    return some_param


start = time.time()
outputs = await parallize_via_asyncio_gather(io_bound_func_changed, params)
print(f"total time: {time.time() - start} seconds")
print(f"outputs: {outputs}")

In [16]:
start = time.time()
outputs = await parallize_via_asyncio_gather(io_bound_func, params, semaphore)
print(f"total time: {time.time() - start} seconds")
print(f"outputs: {outputs}")

param: 1
param: 2
param: 3
param: 4
param: 5
param: 6
param: 7
param: 8
param: 9
param: 10
total time: 3.004940986633301 seconds
outputs: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


What happens if exceptions are raised from individual functions? Ex. in the case that inference fail to run

In [4]:
semaphore = asyncio.Semaphore(4)
params = [1,2,3,4,5,6,7,8,9,10]

async def io_bound_func_w_exceptions(some_param, semaphore):
  """
  A slow function that does some processing on some parameters, for exp, an inference call
  Semaphore limits the number of concurrent requests that can exist like an inference call
  """
  async with semaphore:
    await asyncio.sleep(1)
    print(f"param: {some_param}")
    if some_param == 6:
      raise Exception("Raising an arbitrary exception")
    return some_param

If any individual async call raises an exception, the entire process is stopped and we fail to save the outputs from the other function calls. This unfortunately happens way too often in production inference settings.

In [5]:
start = time.time()
new_outputs = await parallize_via_asyncio_gather(io_bound_func_w_exceptions, params, semaphore)
print(f"total time: {time.time() - start} seconds")
print(f"outputs: {new_outputs}")

param: 1
param: 2
param: 3
param: 4
param: 5
param: 6
param: 7
param: 8


Exception: Raising an arbitrary exception

param: 9
param: 10


In [6]:
new_outputs

NameError: name 'new_outputs' is not defined

To get around this, we use `asyncio.as_completed`, which has some pros and cons

pros:
- you can set individual timeouts easily (to account for cases like inference call stalling)
- you can process exceptions for individual function calls

cons:
- it doesn't preserve the sequence of the returned outputs, it will instead be ordered by which function call finishes first.

In [7]:
from tqdm.asyncio import tqdm

async def parallize_via_asyncio_as_completed(func, params, semaphore):
    outputs = []
    tasks = [func(param, semaphore) for param in params]
    for future in tqdm.as_completed(tasks, total=len(tasks)): # alternatively, use asyncio.as_completed to remove the progress bar
        try:
            output = await asyncio.wait_for(future, timeout=3)
            outputs.append(output)
        except Exception as e:
            print(f"Caught exception: {e}")
        
    return outputs

In [8]:
outputs = await parallize_via_asyncio_as_completed(io_bound_func_w_exceptions, params, semaphore=semaphore)
print(f"outputs: {outputs}")

 10%|█         | 1/10 [00:01<00:09,  1.00s/it]

param: 6
param: 4
param: 2
param: 9
Caught exception: Raising an arbitrary exception


 50%|█████     | 5/10 [00:02<00:01,  2.76it/s]

param: 7
param: 5
param: 3
param: 1


100%|██████████| 10/10 [00:03<00:00,  3.33it/s]

param: 10
param: 8
outputs: [4, 2, 9, 7, 5, 3, 1, 10, 8]





## Parallize via Threading
Useful for CPU bound tasks (complex computation) and I/O bound tasks like reading files.

Usually, we want to start with num_cpu_cores number of threads for CPU bound tasks.

In [1]:
from multi_threading_demo import process_pool_parallelized
import time

start = time.time()
outputs = process_pool_parallelized()
print(f"outputs: {outputs}")
print(f"It took {time.time() - start} seconds via process pool parallization")

100%|██████████| 100/100 [00:05<00:00, 17.57it/s]

outputs: [0, 333328333350000, 2666646666700000, 8999955000050000, 21333253333400000, 41666541666750000, 71999820000100000, 114333088333450000, 170666346666800000, 242999595000150000, 333332833333500000, 443666061666850000, 575999280000200000, 732332488333550000, 914665686666900000, 1124998875000250000, 1365332053333600000, 1637665221666950000, 1943998380000300000, 2286331528333650000, 2666664666667000000, 3086997795000350000, 3549330913333700000, 4055664021667050000, 4607997120000400000, 5208330208333750000, 5858663286667100000, 6560996355000450000, 7317329413333800000, 8129662461667150000, 8999995500000500000, 9930328528333850000, 10922661546667200000, 11978994555000550000, 13101327553333900000, 14291660541667250000, 15551993520000600000, 16884326488333950000, 18290659446667300000, 19772992395000650000, 21333325333334000000, 22973658261667350000, 24695991180000700000, 26502324088334050000, 28394656986667400000, 30374989875000750000, 32445322753334100000, 34607655621667450000, 36863988




AMP structure contains x performance cores and x effiicency cores. They are all physical cores. Efficiency cores are for lower intensity background tasks for the purpose of saving battery (they are weaker). Unplugged vs. plugged in laptop have different optimizations when running the same function above.

plugged in:
- 4 workers: 7.3s
- 8 workers: 5.8s

not plugged in:
- 4 workers: 16.5s
- 8 workers: 13.0s

In [1]:
from multi_threading_demo import thread_pool_parallelized
import time

start = time.time()
outputs = thread_pool_parallelized()
print(f"outputs: {outputs}")
print(f"It took {time.time() - start} seconds via thread pool parallization")

100it [00:22,  4.43it/s]

outputs: [0, 333328333350000, 2666646666700000, 8999955000050000, 21333253333400000, 41666541666750000, 71999820000100000, 114333088333450000, 170666346666800000, 242999595000150000, 333332833333500000, 443666061666850000, 575999280000200000, 732332488333550000, 914665686666900000, 1124998875000250000, 1365332053333600000, 1637665221666950000, 1943998380000300000, 2286331528333650000, 2666664666667000000, 3086997795000350000, 3549330913333700000, 4055664021667050000, 4607997120000400000, 5208330208333750000, 5858663286667100000, 6560996355000450000, 7317329413333800000, 8129662461667150000, 8999995500000500000, 9930328528333850000, 10922661546667200000, 11978994555000550000, 13101327553333900000, 14291660541667250000, 15551993520000600000, 16884326488333950000, 18290659446667300000, 19772992395000650000, 21333325333334000000, 22973658261667350000, 24695991180000700000, 26502324088334050000, 28394656986667400000, 30374989875000750000, 32445322753334100000, 34607655621667450000, 36863988




- ThreadPool takes the same time as no parallization because threadpool is affected by Python's GIL (Global Interpretor Lock), which only allows one thread to execute the python bytecode at a time. 
- ProcessPool gets around this because each process has its own python interpretor and memory space, which is more overhead and memory consumption. It is better for CPU bound tasks because it can effectively utilize multiple cores at the same time
- What is ThreadPool good for then? I/O bound tasks (ex. file reads or network requests that you can't use async for). When the thread is waiting for an I/O bound task to complete (ex. waiting for a network request to send a response), it releases GIL, which allow a second thread to execute.
- TLDR: Most of the time, ProcessPool is probably better

In [4]:
from multi_threading_demo import no_parallelize
import time

start = time.time()
outputs = no_parallelize()
print(f"outputs: {outputs}")
print(f"It took {time.time() - start} seconds without parallization")

outputs: [0, 333328333350000, 2666646666700000, 8999955000050000, 21333253333400000, 41666541666750000, 71999820000100000, 114333088333450000, 170666346666800000, 242999595000150000, 333332833333500000, 443666061666850000, 575999280000200000, 732332488333550000, 914665686666900000, 1124998875000250000, 1365332053333600000, 1637665221666950000, 1943998380000300000, 2286331528333650000, 2666664666667000000, 3086997795000350000, 3549330913333700000, 4055664021667050000, 4607997120000400000, 5208330208333750000, 5858663286667100000, 6560996355000450000, 7317329413333800000, 8129662461667150000, 8999995500000500000, 9930328528333850000, 10922661546667200000, 11978994555000550000, 13101327553333900000, 14291660541667250000, 15551993520000600000, 16884326488333950000, 18290659446667300000, 19772992395000650000, 21333325333334000000, 22973658261667350000, 24695991180000700000, 26502324088334050000, 28394656986667400000, 30374989875000750000, 32445322753334100000, 34607655621667450000, 36863988