# Batch processing using Joblib and Tqdm

In [None]:
from math import ceil
import random
import datetime
from joblib import Parallel, delayed

from tqdm.auto import tqdm

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

from tqdm_batch import batch_process

sns.set_context('poster')

Simple batch processing function

In [None]:
def batch_process_function(row, order, payload):
    """
    Simulate process function
    
    Row and payload are ignored.
    
    Approximate pi
    """
    k, pi = 1, 0
    for i in range(10**6):
        if i % 2 == 0: # even
            pi += 4 / k
        else:  # odd 
            pi -= 4 / k 
        k += 2
    return pi

This function just calculates Pi:

In [None]:
batch_process_function('x', 6, None)

Lets have some demo settings:

In [None]:
order=6
N = 1_000
items = range(N)

Process serially:

In [None]:
%%time
result = [batch_process_function(row, order, None) for row in items]

In [None]:
%%time
result = Parallel(n_jobs=8)(
    delayed(batch_process_function)
    (row, order, None) 
    for row in tqdm(items)
)

Serialization can increase the overhead such that it takes much longer than in a serial fashion:

In [None]:
matrix = np.random.normal(size=(500, 500, 100))

In [None]:
%%time
result = Parallel(n_jobs=8)(
    delayed(batch_process_function)
    (row, order, matrix) 
    for row in tqdm(items)
)

But working in batches, minimizing IO can bring us back on track:

In [None]:
%%time

n_workers = 8

# Create a batch function
def proc_batch(batch, order, matrix):
    return [
        batch_process_function(row, order, matrix)
        for row in batch
    ]

# Divide data in batches
batch_size = ceil(len(items) / n_workers)
batches = [
    items[ix:ix+batch_size]
    for ix in range(0, len(items), batch_size)
]

# divide the work
result = Parallel(n_jobs=8)(
    delayed(proc_batch)
    (batch, order, matrix) 
    for batch in tqdm(batches)
)

This is all wrapped into the tqdm_batch package:

In [None]:
result = batch_process(
    items,
    batch_process_function,
    order=6,
    n_workers=6,
    payload=matrix,
    sep_progress=True,
)

## How many CPUs are optimal?

In [None]:
%%time
result = []
for n_workers in range(1, 13):
    start = datetime.datetime.now()
    _ = batch_process(
        items,
        batch_process_function,
        order=6,
        n_workers=n_workers,
        payload=None,
        sep_progress=False,
    )
    dt = datetime.datetime.now() - start
    result.append({'workers': n_workers, 'time':dt})
    

In [None]:
df = pd.DataFrame(result)
df['dt'] = df.time.dt.total_seconds()

In [None]:
fig, ax = plt.subplots(figsize=(12, 8))
sns.lineplot(x='workers', y='dt', data=df, ax=ax)
ax.plot([1, 12], [df.dt.min(), df.dt.min()], 'k--')
_ = ax.set_ylabel('time [s]')
sns.despine()

In [None]:
fig.savefig('../assets/optimal_cpu.png', bbox_inches='tight')