# Batch Processing

## Objectives:
- Understand batch processing and why it is used
- Explore batch processing in Python with `joblib` library
- Create batch ETL pipeline to update model and dashboard

## What is Batch Processing

### Definition
- Jobs that can run without end user interaction, or can be scheduled to run as resources permit
- Used for running high-volume, repetitive data jobs
- Batch processing works in an **automated** way based on a **scheduler**

More useful introductory discussion [here](https://www.talend.com/resources/batch-processing/).

#### Batch vs Stream

![img](https://res.cloudinary.com/hevo/images/f_auto,q_auto/v1649315584/hevo-learn/Batch-Processing-Batch-Processing-vs-Stream-Processing/Batch-Processing-Batch-Processing-vs-Stream-Processing.png?_i=AA)

(Source: https://hevodata.com/learn/batch-processing/.)

Batch processing is to be contrasted with serial or *stream* processing. Stream processing is critical when you need real-time updating of data reports or analyses. But if you are processing large chunks of data, it can be better to process it in batches.

### Batch size
The batch size refers to the number of work units to be processed within one batch operation. Some examples are:

- The number of lines from a file to load into a database before committing the transaction.
- The number of messages to dequeue from a queue.
- The number of requests to send within one payload.

### Common batch processing usage

- Efficient bulk database updates and automated transaction processing, as contrasted to interactive online transaction processing (OLTP) applications.
- The extract, transform, load (ETL) step in populating data warehouses is inherently a batch process in most implementations.
- Performing bulk operations on digital images such as resizing, conversion, watermarking, or otherwise editing a group of image files.
- Converting computer files from one format to another. For example, a batch job may convert proprietary and legacy files to common standard formats for end-user queries and display.

(Source: https://en.wikipedia.org/wiki/Batch_processing.)

In [1]:
# Import Packages
import sqlite3
import time
from joblib import Parallel, delayed, Memory
from tqdm import tqdm

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from prophet import Prophet



## Today's Agenda

Today we will be exploring batch processing through two examples:

1. Use Python's `joblib` package
1. Create simple batch ETL pipeline to continuously update a model and deploy to dashboard.

## `joblib`

### Advantages

- Disk Caching of Functions & Lazy Re-Evaluation

Cache the results of expensive function calls for later use. Useful during pipeline development.

- Parallel Computing

Execute multiple operations at the same time.

### Caching of Functions

In [2]:
result = []

# Getting the square of the number:
def square_number(no):
    return (no*no)

# Function to compute square of a range of a number:
def get_square_range(start_no, end_no):
    for i in np.arange(start_no, end_no):
        time.sleep(1)
        result.append(square_number(i))
    return result

start = time.time()
# Getting square of 1 to 20:
final_result = get_square_range(1, 21)
end = time.time()

# Total time to compute
print('\nThe function took {:.2f} s to compute.'.format(end - start))
print(final_result)


The function took 20.08 s to compute.
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]


In [8]:
# COMPLETE: Define a location to store cache
location = '/Library/caches'
memory = Memory(location, verbose=0)

result = []

# Function to compute square of a range of a number:
def get_square_range_cached(start_no, end_no):
    for i in np.arange(start_no, end_no):
        time.sleep(1)
        result.append(square_number(i))
    return result

# COMPLETE: Cash results of function
get_square_range_cached = memory.cache(get_square_range_cached)

start = time.time()
# Getting square of 1 to 20:
final_result = get_square_range_cached(20, 41)
end = time.time()

# Total time to compute
print('\nThe function took {:.2f} s to compute.'.format(end - start))
print(final_result)


The function took 21.08 s to compute.
[400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600]


In [9]:
start = time.time()
# Getting square of 1 to 20:
final_result = get_square_range_cached(20, 41)
end = time.time()

print('\nThe function took {:.2f} s to compute.'.format(end - start))
print(final_result)


The function took 0.00 s to compute.
[400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600]


### Parallelizing

The function below is based on the following mathematical theorem:

$\large\frac{\pi}{4} = 1 - \frac{1}{3} + \frac{1}{5} - \frac{1}{7} + \frac{1}{9} - ... = lim_{n\rightarrow\infty}\sum^n_{j=0}\frac{(-1)^j}{2j+1}$

In [10]:
def batch_process_function(row, order, payload):
    """
    Simulate process function
    
    Row and payload are ignored.
    
    Approximate pi
    """
    k, pi = 1, 0
    for i in range(10**order):
        if i % 2 == 0: # even
            pi += 4 / k
        else:  # odd 
            pi -= 4 / k 
        k += 2
    return pi

In [11]:
# Settings
order = 6
N = 1000
items = range(N)

#### Serial

In [12]:
%%time

result = [batch_process_function(row, order, None) for row in items]

CPU times: user 2min 15s, sys: 1.1 s, total: 2min 16s
Wall time: 2min 17s


#### Batch

In [13]:
%%time

# Parallel using joblib and a progress bar using tqdm
result = Parallel(n_jobs=8)(
    delayed(batch_process_function)
    (row, order, None)
    for row in tqdm(items)
)

100%|████████████████████████████████████████████████████████████| 1000/1000 [00:31<00:00, 31.32it/s]


CPU times: user 983 ms, sys: 285 ms, total: 1.27 s
Wall time: 32.4 s


## Batch ETL Pipeline

Next we will walk through a simple example of a batch ETL pipeline that can be used to update a model and deploy it to a dashboard.

### Scenario

We work for a store that is interested in forecasting their future sales. They have a model that forecasts total daily sales for the upcoming month. They would like us to create a pipeline that will automatically update the model on a weekly basis and deploy the results to a dashboard.

### Tasks:
- Extract recent sales data from database
- Transform to appropriate format for time series model
- Load to "Data Warehouse"
- Train model on most recent data and deploy forecasts to dashboard

In [14]:
sales_con = sqlite3.connect('data/sales.db')
warehouse_con = sqlite3.connect('data/warehouse.db')

In [16]:
sales_df = pd.read_sql('SELECT * FROM customer_sales;', sales_con)
sales_df.head()

Unnamed: 0,order_id,order_date,customer_id,region,product_id,category,sales,quantity
0,CA-2014-103800,1/3/2018,DP-13000,Central,OFF-PA-10000174,Office Supplies,16.448,2
1,CA-2014-112326,1/4/2018,PO-19195,Central,OFF-LA-10003223,Office Supplies,11.784,3
2,CA-2014-112326,1/4/2018,PO-19195,Central,OFF-ST-10002743,Office Supplies,272.736,3
3,CA-2014-141817,1/5/2018,MB-18085,East,OFF-AR-10003478,Office Supplies,19.536,3
4,CA-2014-130813,1/6/2018,LS-17230,West,OFF-PA-10002005,Office Supplies,19.44,3


In [17]:
def extract_sales(db_con):
    
    sales_df = pd.read_sql('SELECT * FROM customer_sales;', db_con)
    
    assert type(sales_df) == pd.DataFrame
    
    return sales_df

Transform data for model:
- Aggregate total daily sales
- Format for Prophet model

In [19]:
sales_df['order_date'] = pd.to_datetime(sales_df['order_date'])

In [22]:
agg_df = sales_df.groupby('order_date').sum()['sales'].reset_index()
agg_df.head()

Unnamed: 0,order_date,sales
0,2018-01-03,16.448
1,2018-01-04,284.52
2,2018-01-05,19.536
3,2018-01-06,455.32
4,2018-01-07,87.158


In [23]:
agg_df.columns = ['ds', 'y']
agg_df.head()

Unnamed: 0,ds,y
0,2018-01-03,16.448
1,2018-01-04,284.52
2,2018-01-05,19.536
3,2018-01-06,455.32
4,2018-01-07,87.158


In [24]:
def transform_sales_data(df):
    
    df['order_date'] = pd.to_datetime(df['order_date'])
    
    daily_sales = df.groupby('order_date').sum()['sales'].reset_index()
    
    daily_sales.columns = ['ds', 'y']
    
    assert len(daily_sales.columns) == 2
    
    return daily_sales

Load data into our data warehouse (sqlite warehouse)

In [27]:
agg_df.to_sql('daily_sales', warehouse_con, index=False)

868

In [30]:
most_recent_date = pd.read_sql('SELECT MAX(ds) FROM daily_sales;', warehouse_con)['MAX(ds)'].values[0]
most_recent_date

'2020-12-14 00:00:00'

In [33]:
new_daily_sales = agg_df.loc[agg_df['ds'] > most_recent_date]

In [34]:
if len(new_daily_sales) > 0:
    new_daily_sales.to_sql('daily_sales', warehouse_con, if_exists='append', index=False)

In [35]:
def load_sales_to_warehouse(df, db_con):
    
    try:
        most_recent_date = pd.read_sql('SELECT MAX(ds) FROM daily_sales;', db_con)['MAX(ds)'].values[0]

        new_daily_sales = df.loc[df['ds'] > most_recent_date]

        if len(new_daily_sales) > 0:
            new_daily_sales.to_sql('daily_sales', warehouse_con, if_exists='append', index=False)
    except:
        df.to_sql('daily_sales', db_con, if_exists='append', index=False)

Model new data and update forecasts

In [37]:
model = Prophet()
model.add_country_holidays(country_name='US')
model.add_seasonality(name='monthly', period=30.5, fourier_order=4)

<prophet.forecaster.Prophet at 0x7ffe1c8c6ce0>

In [38]:
model_df = pd.read_sql('SELECT * FROM daily_sales;', warehouse_con)
model_df.head()

Unnamed: 0,ds,y
0,2018-01-03 00:00:00,16.448
1,2018-01-04 00:00:00,284.52
2,2018-01-05 00:00:00,19.536
3,2018-01-06 00:00:00,455.32
4,2018-01-07 00:00:00,87.158


In [40]:
model_df.tail()

Unnamed: 0,ds,y
863,2020-12-10 00:00:00,1348.37
864,2020-12-11 00:00:00,4167.62
865,2020-12-12 00:00:00,1323.185
866,2020-12-13 00:00:00,63.88
867,2020-12-14 00:00:00,822.894


In [39]:
model.fit(model_df)

11:16:21 - cmdstanpy - INFO - Chain [1] start processing
11:16:21 - cmdstanpy - INFO - Chain [1] done processing


<prophet.forecaster.Prophet at 0x7ffe1c8c6ce0>

In [41]:
future = model.make_future_dataframe(periods=30)
future.tail()

Unnamed: 0,ds
893,2021-01-09
894,2021-01-10
895,2021-01-11
896,2021-01-12
897,2021-01-13


In [43]:
forecast = model.predict(future)
forecast.tail(30)[['ds', 'yhat']]

Unnamed: 0,ds,yhat
868,2020-12-15,1100.915749
869,2020-12-16,1178.343819
870,2020-12-17,1100.773295
871,2020-12-18,1190.92656
872,2020-12-19,1210.664972
873,2020-12-20,1071.674174
874,2020-12-21,806.540141
875,2020-12-22,1057.901224
876,2020-12-23,1118.752971
877,2020-12-24,1016.014012


In [44]:
def model_and_forecast_sales(db_con):
    
    model = Prophet()
    model.add_country_holidays(country_name='US')
    model.add_seasonality(name='monthly', period=30.5, fourier_order=4)
    
    model_df = pd.read_sql('SELECT * FROM daily_sales;', db_con)
    
    model.fit(model_df)
    
    future = model.make_future_dataframe(periods=30)
    
    forecast = model.predict(future)
    
    assert len(forecast) == len(model_df) + 30
    
    return forecast