In [2]:
import numba
import numpy as np
import pandas as pd

Author: Li Jin 
Last updated: 20200911

# About this Notebook

This notebook build a POC implementation of Numba engine for window summarization and compare the result between the current engine (Python) vs Numba engine.

# Background
In this paper: https://dl.acm.org/doi/pdf/10.14778/2824032.2824045. The author proposes an achitecture for UDF-centric workflows that allows the UDFs to be compiled
along with the framework execution loop. The concept that this notebook explores - Numba engine for Bamboo, is an implementation of that architecture.

# What is Numba engine for window summarization
Currently, when we loop over each window, the for loop is executed in Python interpreter. With Numba engine, the for loop is combined by numba and executed in C.
Numba engine is only possible when the window UDF itself can be jitted by Numba to C code.

# Why Numba engine
Numba is raising as a promising way to compile Python code to something much faster, e.g., C code or CUDA code. Exposing this could open opportunities for developers and researchers to write high performance window functions.

# Why care about this now
We are currently releasing an API for defining "numpy" UDF. This POC will guide us towards defining the data structure for "numpy" UDF to be future compatible.

# Summary
* Numba engine can be done. A single thread numba engine brings about 2-3x perf gain for the example function. With multi-thread, the numba engine bring another 1-2x perf gain.
* The perf gain of jitting the window UDF itself is about 3-5x.
* Numba has good support for Numpy structured array. It also has similar functionality and pd.DataFrame and has multi datatype support (multiple datatype in a single struct array). I think we should use it as the data structure to represent a "DataFrame" in "numpy" UDF.
* After talking to Jeff Reback, I found pandas is already doing some of these optimizations in the upcoming version: https://github.com/pandas-dev/pandas/pull/35759/files#diff-faca0f5129dcc0262a4435580a299926R151

# Question

* Why does a jitted function out perform an unjitted function, even the function is mostly using vectorized operation? (See my_udf)
* Is it possible to use the CUDA backend for numba and combine both the framework and udf the code to be executed on GPU? If so, what restriction does that that bring?
* Does multi column UDF work (UDF that returns multiple columns)


In [3]:
def my_udf(data):
    # Data here is a a numpy structured array
    # https://numpy.org/doc/stable/user/basics.rec.html
    if data[-1]['v3'] == 'some_str':
        return data['v1'].sum() / data['v2'].sum()
    else:
        return data['v2'].sum() / data['v1'].sum()

my_udf_jit = numba.njit(my_udf)

In [4]:
# Prepare data for testing

num = 1000000
window_size = 1000

v1 = np.arange(num) + 1
v2 = v1.astype('double') + 1000
v3 = np.array(['some_str', 'some_other_str'] * int(num / 2))

data_dtype = np.dtype(
    {
        'names':['v1','v2','v3'], 
        # Limitations - size of the string fields needs to be known
        'formats':[np.int, np.double, 'U16']
    }
)

data = np.zeros(1000000, dtype=data_dtype)
data['v1'] = v1
data['v2'] = v2
data['v3'] = v3
               
upper_indices = np.arange(num) + 1
lower_indices = np.clip(upper_indices-window_size, a_min=0, a_max=None)

In [5]:
# This is a structured array
data

array([(      1,    1001., 'some_str'),
       (      2,    1002., 'some_other_str'),
       (      3,    1003., 'some_str'), ...,
       ( 999998, 1000998., 'some_other_str'),
       ( 999999, 1000999., 'some_str'),
       (1000000, 1001000., 'some_other_str')],
      dtype=[('v1', '<i8'), ('v2', '<f8'), ('v3', '<U16')])

In [6]:
# Simplied implementation of window summarization

def _rolling_window_summarize(func, data, lower_indices, upper_indices):
    result = np.zeros(len(data))
    for i in range(len(lower_indices)):
        lower = lower_indices[i]
        upper = upper_indices[i]
        result[i] = func(data[lower: upper])
    return result

@numba.njit
def _rolling_window_summarize_jit(func, data, lower_indices, upper_indices):
    result = np.zeros(len(data))
    for i in numba.prange(len(lower_indices)):
        lower = lower_indices[i]
        upper = upper_indices[i]
        result[i] = func(data[lower: upper])
    return result

@numba.njit(parallel=True)
def _rolling_window_summarize_jit_parallel(func, data, lower_indices, upper_indices):
    result = np.zeros(len(data))
    for i in numba.prange(len(lower_indices)):
        lower = lower_indices[i]
        upper = upper_indices[i]
        result[i] = func(data[lower: upper])
    return result

def rolling_window_summarize(func, data, lower_indices, upper_indices, engine):
    if engine == 'python':
        return _rolling_window_summarize(func, data, lower_indices, upper_indices)
    elif engine == 'numba':
        return _rolling_window_summarize_jit(func, data, lower_indices, upper_indices)
    elif engine == 'numba-parallel':
        return _rolling_window_summarize_jit_parallel(func, data, lower_indices, upper_indices)


# Micro Benchmark

In [7]:
%%time
rolling_window_summarize(my_udf, data, lower_indices, upper_indices, engine='python')

CPU times: user 16.9 s, sys: 20.1 ms, total: 16.9 s
Wall time: 17 s


array([9.99000999e-04, 6.67666667e+02, 1.99600798e-03, ...,
       1.00100050e+00, 9.99000499e-01, 1.00100050e+00])

In [8]:
%%time
rolling_window_summarize(my_udf_jit, data, lower_indices, upper_indices, engine='python')

CPU times: user 4.36 s, sys: 56.3 ms, total: 4.42 s
Wall time: 4.44 s


array([9.99000999e-04, 6.67666667e+02, 1.99600798e-03, ...,
       1.00100050e+00, 9.99000499e-01, 1.00100050e+00])

In [9]:
%%time
rolling_window_summarize(my_udf_jit, data, lower_indices, upper_indices, engine='numba')

CPU times: user 2.2 s, sys: 7.39 ms, total: 2.21 s
Wall time: 2.22 s


array([9.99000999e-04, 6.67666667e+02, 1.99600798e-03, ...,
       1.00100050e+00, 9.99000499e-01, 1.00100050e+00])

In [10]:
%%time
rolling_window_summarize(my_udf_jit, data, lower_indices, upper_indices, engine='numba-parallel')



CPU times: user 4.8 s, sys: 7.28 ms, total: 4.81 s
Wall time: 1.17 s


array([9.99000999e-04, 6.67666667e+02, 1.99600798e-03, ...,
       1.00100050e+00, 9.99000499e-01, 1.00100050e+00])