# Batch Processing with `joblib`

In [25]:
from math import ceil, sqrt
import random
import datetime
from joblib import Parallel, delayed, Memory
from typing import Callable, Dict, List, Union
from multiprocessing import Queue, Manager
from threading import Thread

import tqdm as tqdm_
from tqdm.auto import tqdm

import numpy as np
import pandas as pd
import time

import matplotlib.pyplot as plt
import seaborn as sns

from tqdm_batch import batch_process

sns.set_context('poster')

![img](https://res.cloudinary.com/hevo/images/f_auto,q_auto/v1649315584/hevo-learn/Batch-Processing-Batch-Processing-vs-Stream-Processing/Batch-Processing-Batch-Processing-vs-Stream-Processing.png?_i=AA)

(Source: https://hevodata.com/learn/batch-processing/.)

## Definition

Jobs that can run without end user interaction, or can be scheduled to run as resources permit, are called batch jobs. Batch processing is for those frequently used programs that can be executed with minimal human interaction.

A program that reads a large file and generates a report, for example, is considered to be a batch job.

The term batch job originated in the days when punched cards contained the directions for a computer to follow when running one or more programs. Multiple card decks representing multiple jobs would often be stacked on top of one another in the hopper of a card reader, and be run in batches.

(Source: https://www.ibm.com/docs/en/zos-basic-skills?topic=jobs-what-is-batch-processing.)

Batch processing is to be contrasted with serial or *stream* processing. Stream processing is critical when you need real-time updating of data reports or analyses. But if you are processing large chunks of data, it can be better to process it in batches.

Batch processing works in an **automated** way based on a **scheduler**.

More useful introductory discussion [here](https://www.talend.com/resources/batch-processing/).

## Batch size
The batch size refers to the number of work units to be processed within one batch operation. Some examples are:

- The number of lines from a file to load into a database before committing the transaction.
- The number of messages to dequeue from a queue.
- The number of requests to send within one payload.

## Common batch processing usage

- Efficient bulk database updates and automated transaction processing, as contrasted to interactive online transaction processing (OLTP) applications.
- The extract, transform, load (ETL) step in populating data warehouses is inherently a batch process in most implementations.
- Performing bulk operations on digital images such as resizing, conversion, watermarking, or otherwise editing a group of image files.
- Converting computer files from one format to another. For example, a batch job may convert proprietary and legacy files to common standard formats for end-user queries and display.

(Source: https://en.wikipedia.org/wiki/Batch_processing.)

## `joblib`

### Advantages

- Disk Caching of Functions & Lazy Re-Evaluation

Separate flow-execution logic from algorithmic logic and **memoize** to speed up computations. That is, cache the results of expensive function calls for later use.

- Parallel Computing

Simple and easy to debug.

- Fast Storage / Compression

Better than `pickle` for large objects.

(Source: https://hevodata.com/learn/python-batch-processing/.)

### Sample Code

In [4]:
cachedir = '/Library/Caches'
mem = Memory(cachedir)

a = np.vander(np.arange(3)).astype(float)
square = mem.cache(np.square)
b = square(a)           

________________________________________________________________________________
[Memory] Calling square...
square(array([[0., 0., 1.],
       [1., 1., 1.],
       [4., 2., 1.]]))
___________________________________________________________square - 0.0s, 0.0min


In [5]:
c = square(a)

In [23]:
Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10))

[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

In [24]:
%%time

Parallel(n_jobs=2, prefer='threads')(delayed(sqrt)(i**2) for i in range(10))

CPU times: user 3.33 ms, sys: 5.89 ms, total: 9.22 ms
Wall time: 11.5 ms


[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

### `joblib` Example

(Source: https://towardsdatascience.com/using-joblib-to-speed-up-your-python-pipelines-dd97440c653d.)

In [26]:
result = []

# Getting the square of the number:
def square_number(no):
    return (no*no)

# Function to compute square of a range of a number:
def get_square_range(start_no, end_no):
    for i in np.arange(start_no, end_no):
        time.sleep(1)
        result.append(square_number(i))
    return result

start = time.time()
# Getting square of 1 to 20:
final_result = get_square_range(1, 21)
end = time.time()

# Total time to compute
print('\nThe function took {:.2f} s to compute.'.format(end - start))
print(final_result)


The function took 20.11 s to compute.
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]


In [27]:
# Define a location to store cache
location = '/Library/Caches'
memory = Memory(location, verbose=0)

result = []

# Function to compute square of a range of a number:
def get_square_range_cached(start_no, end_no):
    for i in np.arange(start_no, end_no):
        time.sleep(1)
        result.append(square_number(i))
    return result

get_square_range_cached = memory.cache(get_square_range_cached)

start = time.time()
# Getting square of 1 to 20:
final_result = get_square_range_cached(1, 21)
end = time.time()

# Total time to compute
print('\nThe function took {:.2f} s to compute.'.format(end - start))
print(final_result)


The function took 20.11 s to compute.
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]


In [28]:
start = time.time()
# Getting square of 1 to 20:
final_result = get_square_range_cached(1, 21)
end = time.time()

print('\nThe function took {:.2f} s to compute.'.format(end - start))
print(final_result)


The function took 0.01 s to compute.
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]


#### Parallelizing

In [31]:
rng = np.random.RandomState(42)
data = rng.randn(int(1e4), 4)
data[:5, :]

array([[ 0.49671415, -0.1382643 ,  0.64768854,  1.52302986],
       [-0.23415337, -0.23413696,  1.57921282,  0.76743473],
       [-0.46947439,  0.54256004, -0.46341769, -0.46572975],
       [ 0.24196227, -1.91328024, -1.72491783, -0.56228753],
       [-1.01283112,  0.31424733, -0.90802408, -1.4123037 ]])

In [32]:
def costly_compute(data, column):
    """Emulate a costly function by sleeping and returning a column."""
    time.sleep(2)
    return data[column]

def data_processing_mean(data, column):
    """Compute the mean of a column."""
    return costly_compute(data, column).mean()

start = time.time()
results = [data_processing_mean(data, col) for col in range(data.shape[1])]
stop = time.time()

print('\nSequential processing')
print('Elapsed time for the entire processing: {:.2f} s'
      .format(stop - start))


Sequential processing
Elapsed time for the entire processing: 8.02 s


In [33]:
location = '/Library/Caches'
memory = Memory(location, verbose=0)
costly_compute_cached = memory.cache(costly_compute)

def data_processing_mean_using_cache(data, column):
    """Compute the mean of a column."""
    return costly_compute_cached(data, column).mean()

start = time.time()

# Here is where we adjust the number of workers
results = Parallel(n_jobs=2)(
    delayed(data_processing_mean_using_cache)(data, col)
    for col in range(data.shape[1]))
stop = time.time()

print('Elapsed time for the entire processing: {:.2f} s'
      .format(stop - start))

Elapsed time for the entire processing: 5.04 s


In [34]:
location = '/Library/Caches'
memory = Memory(location, verbose=0)
costly_compute_cached = memory.cache(costly_compute)

def data_processing_mean_using_cache(data, column):
    """Compute the mean of a column."""
    return costly_compute_cached(data, column).mean()

start = time.time()

# Let's try 8 workers!
results = Parallel(n_jobs=8)(
    delayed(data_processing_mean_using_cache)(data, col)
    for col in range(data.shape[1]))
stop = time.time()

print('Elapsed time for the entire processing: {:.2f} s'
      .format(stop - start))

Elapsed time for the entire processing: 1.21 s


## `tqdm`

In [30]:
num = 0
for j in tqdm_.tqdm(range(10000000)):
    num += j
    if not j % 1000000:
        print(num)

  7%|███▊                                                   | 686330/10000000 [00:00<00:02, 3499486.65it/s]

0


 15%|███████▊                                              | 1452893/10000000 [00:00<00:02, 3725625.66it/s]

500000500000


 26%|██████████████▏                                       | 2635172/10000000 [00:00<00:01, 3885705.32it/s]

2000001000000


 34%|██████████████████▌                                   | 3443847/10000000 [00:00<00:01, 3964059.64it/s]

4500001500000


 46%|█████████████████████████                             | 4631914/10000000 [00:01<00:01, 3913970.75it/s]

8000002000000


 54%|█████████████████████████████▎                        | 5425966/10000000 [00:01<00:01, 3925653.17it/s]

12500002500000


 66%|███████████████████████████████████▊                  | 6624304/10000000 [00:01<00:00, 3944554.30it/s]

18000003000000
24500003500000


 86%|██████████████████████████████████████████████▍       | 8610858/10000000 [00:02<00:00, 3899253.69it/s]

32000004000000


 94%|██████████████████████████████████████████████████▋   | 9397924/10000000 [00:02<00:00, 3894928.72it/s]

40500004500000


100%|█████████████████████████████████████████████████████| 10000000/10000000 [00:02<00:00, 3872570.82it/s]


## Contrasting Serial and Batch Processing

The function below is based on the following mathematical theorem:

$\large\frac{\pi}{4} = 1 - \frac{1}{3} + \frac{1}{5} - \frac{1}{7} + \frac{1}{9} - ... = lim_{n\rightarrow\infty}\sum^n_{j=0}\frac{(-1)^j}{2j+1}$

In [31]:
def batch_process_function(row, order, payload):
    """
    Simulate process function
    
    Row and payload are ignored.
    
    Approximate pi
    """
    k, pi = 1, 0
    for i in range(10**order):
        if i % 2 == 0: # even
            pi += 4 / k
        else:  # odd 
            pi -= 4 / k 
        k += 2
    return pi

In [32]:
# Settings
order=6
N = 1_000
items = range(N)

### Serial

In [15]:
result = [batch_process_function(row, order, None) for row in items]

In [16]:
result[0]

3.1415916535897743

In [10]:
%%time

# Serial run
result = [batch_process_function(row, order, None) for row in items]

CPU times: user 1min 57s, sys: 558 ms, total: 1min 57s
Wall time: 1min 59s


### Batch

In [17]:
result = joblib.Parallel(n_jobs=8)(
    delayed(batch_process_function)
    (row, order, None)
    for row in tqdm_.tqdm(items)
)

100%|██████████████████████████████████████████████████████████████████| 1000/1000 [00:23<00:00, 42.85it/s]


In [27]:
result[0]

3.1415916535897743

In [37]:
%%time

# Parallel using joblib and a progress bar using tqdm
result = Parallel(n_jobs=8)(
    delayed(batch_process_function)
    (row, order, None) 
    for row in tqdm_.tqdm(items)
)

100%|██████████████████████████████████████████████████████████████████| 1000/1000 [00:19<00:00, 50.91it/s]


CPU times: user 648 ms, sys: 110 ms, total: 758 ms
Wall time: 20.2 s


## Things to Be Aware of

- Batch Triggers
- Scheduling
- Exception Alerts

## Serialize per Batch

(Source: https://towardsdatascience.com/parallel-batch-processing-in-python-8dcce607d226)

In [39]:
# Random payload to simulate a model
matrix = np.random.normal(size=(500, 500, 100))

# Use default joblib
result = Parallel(n_jobs=8)(
    delayed(batch_process_function)
    (row, order, matrix) 
    for row in tqdm(items)
)

  0%|          | 0/1000 [00:00<?, ?it/s]

In [40]:
n_workers = 8

# Create a batch function
def proc_batch(batch, order, matrix):
    return [
        batch_process_function(row, order, matrix)
        for row in batch
    ]

# Divide data in batches
batch_size = np.ceil(len(items) / n_workers)
batches = [
    items[ix:ix+int(batch_size)] for ix in range(0, len(items), int(batch_size))
]

# divide the work
result = Parallel(n_jobs=n_workers)(
    delayed(proc_batch)
    (batch, order, matrix) 
    for batch in tqdm(batches)
)

  0%|          | 0/8 [00:00<?, ?it/s]

In [41]:
def progress_bar(
    totals: Union[int, List[int]],
    queue : Queue,
) -> None:
    if isinstance(totals, list):
        splitted = True
        pbars = [
            tqdm(
                desc=f'Worker {pid + 1}',
                total=total,
                position=pid,
            )
            for pid, total in enumerate(totals)
        ]
    else:
        splitted = False
        pbars = [
            tqdm(total=totals)
        ]

    while True:
        try:
            message = queue.get()
            if message.startswith('update'):
                if splitted:
                    pid = int(message[6:])
                    pbars[pid].update(1)
                else:
                    pbars[0].update(1)
            elif message == 'done':
                break
        except:
            pass
    for pbar in pbars:
        pbar.close()

        
def task_wrapper(pid, function, batch, queue, *args, **kwargs):
    result = []
    for example in batch:
        result.append(function(example, *args, **kwargs))
        queue.put(f'update{pid}')
    return result

        
def batch_process(
    items: list,
    function: Callable,
    n_workers: int=8,
    sep_progress: bool=False,
    *args,
    **kwargs,
    ) -> List[Dict[str, Union[str, List[str]]]]:
    # Divide data in batches
    batch_size = ceil(len(items) / n_workers)
    batches = [
        items[ix:ix+batch_size]
        for ix in range(0, len(items), batch_size)
    ]

    # Check single or multiple progress bars
    if sep_progress:
        totals = [len(batch) for batch in batches]
    else:
        totals = len(items)

    # Start progress bar in separate thread
    manager = Manager()
    queue = manager.Queue()
    progproc = Thread(target=progress_bar, args=(totals, queue))
    progproc.start()

    # Parallel process the batches
    result = Parallel(n_jobs=n_workers)(
        delayed(task_wrapper)
        (pid, function, batch, queue, *args, **kwargs)
        for pid, batch in enumerate(batches)
    )

    # Stop the progress bar thread
    queue.put('done')
    progproc.join()

    # Flatten result
    flattened = [item for sublist in result for item in sublist]

    return flattened

In [42]:
result = batch_process(items,
                       batch_process_function,
                       order=6,
                       n_workers=8,
                       payload=matrix,
                       sep_progress=True
                      )

Worker 1:   0%|          | 0/125 [00:00<?, ?it/s]

Worker 2:   0%|          | 0/125 [00:00<?, ?it/s]

Worker 3:   0%|          | 0/125 [00:00<?, ?it/s]

Worker 4:   0%|          | 0/125 [00:00<?, ?it/s]

Worker 5:   0%|          | 0/125 [00:00<?, ?it/s]

Worker 6:   0%|          | 0/125 [00:00<?, ?it/s]

Worker 7:   0%|          | 0/125 [00:00<?, ?it/s]

Worker 8:   0%|          | 0/125 [00:00<?, ?it/s]