In [9]:
import numpy as np
from time import time as now

x = np.random.uniform(0, 1, size=(10000000,))

## Single pass computing

In [10]:
def g(x):
  mu, mu2, m, M = 0., 0., np.inf, -np.inf
  for xi in x:
    mu += xi
    mu2 += xi * xi
    m = min(m, xi)
    M = max(M, xi)
  mu /= len(x)
  std = (mu2 / len(x) - mu * mu) ** 0.5
  return mu, std, m, M

tic = now()
g(x)
toc = now()
print(f"Sync (single pass): {toc - tic:.10f} seconds")
g(x)

Sync (single pass): 11.7864890099 seconds


(0.499827226533852,
 0.2887206833330619,
 1.3349403715778863e-07,
 0.999999951870965)

## Parallel For Loop

In [11]:
import numpy as np
from pathos.multiprocessing import ProcessingPool as Pool

def g_block(x_block):
  import numpy as np # Ensure numpy is imported in each worker
  
  mu = np.sum(x_block)
  mu2 = np.sum(x_block ** 2)
  m = np.min(x_block)
  M = np.max(x_block)
  return mu, mu2, m, M

def g_parallel_pathos(x, num_processes=4):
  x = np.asarray(x)
  n = len(x)
  block_size = (n + num_processes - 1) // num_processes
  blocks = [x[i:i+block_size] for i in range(0, n, block_size)]

  with Pool(nodes=num_processes) as pool:
    results = pool.map(g_block, blocks)

  total_mu = sum(r[0] for r in results)
  total_mu2 = sum(r[1] for r in results)
  global_min = min(r[2] for r in results)
  global_max = max(r[3] for r in results)

  mu = total_mu / n
  std = (total_mu2 / n - mu * mu) ** 0.5
  return mu, std, global_min, global_max

tic = now()
g_parallel_pathos(x)
toc = now()
print(f"Sync (single pass): {toc - tic:.10f} seconds")
g_parallel_pathos(x)

Sync (single pass): 0.7826633453 seconds


(0.49982722653384276,
 0.28872068333304635,
 1.3349403715778863e-07,
 0.999999951870965)

## Numpy Computing

In [12]:
def f (x):
  return (np.mean(x), np.std(x), np.min(x), np.max(x))


tic = now()
f(x)
toc = now()
print(f"Sync: {toc - tic:.10f} seconds")
f(x)

Sync: 0.2758500576 seconds


(0.49982722653384265,
 0.2887206833330463,
 1.3349403715778863e-07,
 0.999999951870965)

## Async Computing

In [13]:
import asyncio
import numpy as np
from concurrent.futures import ThreadPoolExecutor

def calc_mean(array):
  return np.mean(array)

def calc_std(array):
  return np.std(array)

def calc_min(array):
  return np.min(array)

def calc_max(array):
  return np.max(array)

async def async_calculate_all(array):
  loop = asyncio.get_running_loop()
  with ThreadPoolExecutor() as pool:
    # Crea subito tutti i task, NON aspetta subito
    mean_future = loop.run_in_executor(pool, calc_mean, array)
    std_future  = loop.run_in_executor(pool, calc_std, array)
    min_future  = loop.run_in_executor(pool, calc_min, array)
    max_future  = loop.run_in_executor(pool, calc_max, array)

    # Aspetta tutti insieme
    results = await asyncio.gather(mean_future, std_future, min_future, max_future)
  return tuple(results)

tic = now()
stats = await async_calculate_all(x)
toc = now()
print(f"ASync: {toc - tic:.10f} seconds")
await async_calculate_all(x)

ASync: 0.2564344406 seconds


(0.49982722653384265,
 0.2887206833330463,
 1.3349403715778863e-07,
 0.999999951870965)

## Async For Loop

In [14]:
import asyncio
import numpy as np
from concurrent.futures import ThreadPoolExecutor

def g_block(x_block):
  mu = np.sum(x_block)
  mu2 = np.sum(x_block ** 2)
  m = np.min(x_block)
  M = np.max(x_block)
  return mu, mu2, m, M

async def async_g_parallel(x, num_threads=4):
  x = np.asarray(x)
  n = len(x)
  block_size = (n + num_threads - 1) // num_threads
  blocks = [x[i:i+block_size] for i in range(0, n, block_size)]

  loop = asyncio.get_running_loop()
  with ThreadPoolExecutor(max_workers=num_threads) as executor:
    tasks = []
    for block in blocks:
      # Submit g_block to executor, wrapped in async future
      task = loop.run_in_executor(executor, g_block, block)
      tasks.append(task)

    results = await asyncio.gather(*tasks)

  total_mu = sum(r[0] for r in results)
  total_mu2 = sum(r[1] for r in results)
  global_min = min(r[2] for r in results)
  global_max = max(r[3] for r in results)

  mu = total_mu / n
  std = (total_mu2 / n - mu * mu) ** 0.5
  return mu, std, global_min, global_max

tic = now()
stats = await async_g_parallel(x)
toc = now()
print(f"ASync: {toc - tic:.10f} seconds")
await async_g_parallel(x)

ASync: 0.1435821056 seconds


(0.49982722653384276,
 0.28872068333304635,
 1.3349403715778863e-07,
 0.999999951870965)