# Bodo Getting Started Tutorial

Bodo is the simplest and most efficient analytics engine. It accelerates and scales data science programs
automatically and enables instant deployment, eliminating the need to rewrite Python analytics code to Spark/Scala, SQL or MPI/C++.
In this tutorial, we will cover the basics of using Bodo and explain its important concepts.

In a nutshell, Bodo provides a just-in-time (JIT) compilation workflow using the `@bodo.jit` decorator. It replaces decorated Python functions with an optimized and parallelized binary version using advanced compilation methods.

Let's get started!

## Environment Setup
Please follow the [Bodo installation](http://docs.bodo.ai/latest/source/install.html) and [Jupyter Notebook Setup](http://docs.bodo.ai/latest/source/jupyter.html) pages to setup the environment. Also, make sure MPI engines are started in the `IPython Clusters` tab (or using `ipcluster start -n 4 --profile=mpi` in command line), then initialize the `ipyparallel` environment:

In [1]:
import ipyparallel as ipp
c = ipp.Client(profile='mpi')
view = c[:]
view.activate()

## Parallel Pandas with Bodo
First, we demonstrate how Bodo automatically parallelizes and optimizes standard Python programs that make use of pandas and NumPy, without the need to rewrite your code. Bodo can scale your analytics code to thousands of cores, providing orders of magnitude speed up depending on program characteristics.

### Generate data
To begin, let's generate a simple dataset (the size of this dataframe in memory is approximately 305 MB, and the size of the written Parquet file is 77 MB):

In [2]:
import pandas as pd
import numpy as np

NUM_GROUPS = 30
NUM_ROWS = 20_000_000
df = pd.DataFrame({
    "A": np.arange(NUM_ROWS) % NUM_GROUPS,
    "B": np.arange(NUM_ROWS)
})
df.to_parquet("example1.pq")
print(df)

           A         B
0          0         0
1          1         1
2          2         2
3          3         3
4          4         4
...       ..       ...
19999995  15  19999995
19999996  16  19999996
19999997  17  19999997
19999998  18  19999998
19999999  19  19999999

[20000000 rows x 2 columns]


### Data Analysis
Now let's read and process this dataframe. First using Python and pandas:

In [3]:
def test():
    df = pd.read_parquet("example1.pq")
    df2 = df.groupby("A").sum()
    m = df2.B.max()
    print(m)

test()

6666676000003


Now let's run it with Bodo in parallel. To do this, all that we have to do is add the `bodo.jit` decorator to the function, and run the program with MPI (on Jupyter Notebook, use the `%%px --block` *magic* to run on MPI engines):

In [4]:
%%px --block
import pandas as pd
import bodo

@bodo.jit
def test():
    df = pd.read_parquet("example1.pq")
    df2 = df.groupby("A").sum()
    m = df2.B.max()
    print(m)

test()

[stdout:0] 6666676000003


Although the program appears to be a regular sequential Python program, Bodo compiles and *transforms* the decorated code (the `test` function in this example) under the hood, so that it can run in parallel on many cores. Each core operates on a different chunk of the data and communicates with other cores when necessary.

### Parallel Python Processes
With Bodo, all processes are running the same code. Bodo manages parallelism inside `jit` functions to match sequential Python as much as possible. On the other hand, the code outside `jit` functions runs as regular Python on all processes. For example, the code below when run on 4 processes produces 4 prints, one for each Python process:

In [5]:
%%px --block

@bodo.jit
def test():
    df = pd.read_parquet("example1.pq")
    df2 = df.groupby("A").sum()
    m = df2.B.max()
    return m

m = test()
print(m)

[stdout:0] 6666676000003
[stdout:1] 6666676000003
[stdout:2] 6666676000003
[stdout:3] 6666676000003


### Prints
Bodo prints replicated values like `m` only once (on process `0`) to avoid redundant printing, but we can use `bodo.parallel_print` to see prints on all processes:

In [6]:
%%px --block

@bodo.jit
def test():
    df = pd.read_parquet("example1.pq")
    df2 = df.groupby("A").sum()
    m = df2.B.max()
    bodo.parallel_print(m)

test()

[stdout:0] 6666676000003
[stdout:1] 6666676000003
[stdout:2] 6666676000003
[stdout:3] 6666676000003


### Parallel Data Read

Bodo can read data from storage such as Parquet files in parallel. This means that each process reads only its own chunk of data (which can be proportionally faster than sequential read). The example below demonstrates parallel read by printing data chunks on different cores:

In [7]:
%%px --block

@bodo.jit
def test():
    df = pd.read_parquet("example1.pq")
    print(df)

test()

[stdout:0] 
          A        B
0         0        0
1         1        1
2         2        2
3         3        3
4         4        4
...      ..      ...
4999995  15  4999995
4999996  16  4999996
4999997  17  4999997
4999998  18  4999998
4999999  19  4999999

[5000000 rows x 2 columns]
[stdout:1] 
          A        B
5000000  20  5000000
5000001  21  5000001
5000002  22  5000002
5000003  23  5000003
5000004  24  5000004
...      ..      ...
9999995   5  9999995
9999996   6  9999996
9999997   7  9999997
9999998   8  9999998
9999999   9  9999999

[5000000 rows x 2 columns]
[stdout:2] 
           A         B
10000000  10  10000000
10000001  11  10000001
10000002  12  10000002
10000003  13  10000003
10000004  14  10000004
...       ..       ...
14999995  25  14999995
14999996  26  14999996
14999997  27  14999997
14999998  28  14999998
14999999  29  14999999

[5000000 rows x 2 columns]
[stdout:3] 
           A         B
15000000   0  15000000
15000001   1  15000001
15000002   2  15000

Looking at column B, we can clearly see that each process has a separate chunk of the original dataframe. 

### Parallelizing Computation

![Groupby shuffle communication pattern](img/groupby.jpg)

Bodo parallelizes computation automatically by dividing the work between cores and performing the necessary data communication. For example, the `groupby` operation in our example needs the data of each group to be on the same processor. This requires *shuffling* data across the cluster. Bodo uses [MPI](https://en.wikipedia.org/wiki/Message_Passing_Interface) for efficient communication, which is usually much faster than alternative methods.

### Parallel Write

Bodo can write data to storage in parallel as well:

In [8]:
%%px --block

@bodo.jit
def test():
    df = pd.read_parquet("example1.pq")
    df2 = df.groupby("A").sum()
    df2.to_parquet("example1-df2.pq")

test()

Now let's read and print the results with pandas:

In [9]:
import pandas as pd

df = pd.read_parquet("example1-df2.pq")
print(df)

                B
A                
0   6666663333330
4   6666665999998
6   6666667333332
16  6666674000002
20  6666656666670
24  6666659333334
28  6666661999998
1   6666663999997
7   6666667999999
8   6666668666666
11  6666670666667
12  6666671333334
13  6666672000001
15  6666673333335
18  6666675333336
5   6666666666665
19  6666676000003
21  6666657333336
22  6666658000002
23  6666658666668
29  6666662666664
2   6666664666664
3   6666665333331
9   6666669333333
10  6666670000000
14  6666672666668
17  6666674666669
25  6666660000000
26  6666660666666
27  6666661333332


The order of the `groupby` results generated by Bodo can differ from pandas since Bodo doesn't automatically sort the output distributed data (it is expensive and not necessary in many cases). Users can explicitly sort dataframes at any point if desired.

### Specifying Data Distribution

Bodo automatically distributes data and computation in Bodo functions by analyzing them for parallelization. However, Bodo does not know how input parameters of Bodo functions are distributed, and similarly how the user wants to handle return values. As such, Bodo assumes that input parameters and return values are *replicated* by default, meaning that every process receives the same input data and returns the same output, as opposed to different data chunks.

<div class="alert alert-block alert-danger"
<b>Important:</b> The distribution scheme of input parameters and return values determines the distribution scheme for variables inside the Bodo function that depend on them.
</div>

To illustrate this effect, let's return the `groupby` output from the Bodo function:

In [10]:
%%px --block
import pandas as pd
import bodo

@bodo.jit
def test():
    df = pd.read_parquet("example1.pq")
    df2 = df.groupby("A").sum()
    return df2

df2 = test()
print(df2)

[stdout:0] 
                B
A                
0   6666663333330
1   6666663999997
2   6666664666664
3   6666665333331
4   6666665999998
5   6666666666665
6   6666667333332
7   6666667999999
8   6666668666666
9   6666669333333
10  6666670000000
11  6666670666667
12  6666671333334
13  6666672000001
14  6666672666668
15  6666673333335
16  6666674000002
17  6666674666669
18  6666675333336
19  6666676000003
20  6666656666670
21  6666657333336
22  6666658000002
23  6666658666668
24  6666659333334
25  6666660000000
26  6666660666666
27  6666661333332
28  6666661999998
29  6666662666664
[stdout:1] 
                B
A                
0   6666663333330
1   6666663999997
2   6666664666664
3   6666665333331
4   6666665999998
5   6666666666665
6   6666667333332
7   6666667999999
8   6666668666666
9   6666669333333
10  6666670000000
11  6666670666667
12  6666671333334
13  6666672000001
14  6666672666668
15  6666673333335
16  6666674000002
17  6666674666669
18  6666675333336
19  6666676000003
20  

[stderr:0] 
  "information.".format(self.func_ir.func_id.func_name)


As we can see, `df2` has the same data on every process. Furthermore, Bodo warns that it didn't find any parallelism inside the `test` function. In this example, every process reads the whole input Parquet file and executes the same sequential program. The reason is that Bodo makes sure all variables dependent on `df2` have the same distribution, creating an inverse cascading effect.

### `distributed` Flag

The user can tell Bodo what input/output variables should be distributed using the `distributed` flag:

In [11]:
%%px --block

@bodo.jit(distributed=["df2"])
def test():
    df = pd.read_parquet("example1.pq")
    df2 = df.groupby("A").sum()
    return df2

df2 = test()
print(df2)

[stdout:0] 
                B
A                
0   6666663333330
4   6666665999998
6   6666667333332
16  6666674000002
20  6666656666670
24  6666659333334
28  6666661999998
[stdout:1] 
                B
A                
1   6666663999997
7   6666667999999
8   6666668666666
11  6666670666667
12  6666671333334
13  6666672000001
15  6666673333335
18  6666675333336
[stdout:2] 
                B
A                
5   6666666666665
19  6666676000003
21  6666657333336
22  6666658000002
23  6666658666668
29  6666662666664
[stdout:3] 
                B
A                
2   6666664666664
3   6666665333331
9   6666669333333
10  6666670000000
14  6666672666668
17  6666674666669
25  6666660000000
26  6666660666666
27  6666661333332


In this case, the program is fully parallelized and chunks of data are returned to Python on different processes.

### Basic benchmarking of the pandas example
Now let's do some basic benchmarking to observe the effect of Bodo's automatic parallelization. Here we are only scaling up to a few cores, but Bodo can scale the same code to thousands of cores in a cluster.

Let's add timers and run the code again with pandas:

In [14]:
import pandas as pd
import time

def test():
    df = pd.read_parquet("example1.pq")
    t0 = time.time()
    df2 = df.groupby("A").sum()
    m = df2.B.max()
    print("Compute time:", time.time() - t0, "secs")
    return m

result = test()

Compute time: 0.7629697322845459 secs


Now let's measure Bodo's execution time.

In [17]:
%%px --block
import time

@bodo.jit
def test():
    df = pd.read_parquet("example1.pq")
    t0 = time.time()
    df2 = df.groupby("A").sum()
    m = df2.B.max()
    print("Compute time:", time.time() - t0, "secs")
    return m

result = test()

[stdout:0] Compute time: 0.15030407905578613 secs


As we can see, Bodo computes results faster than pandas using parallel computation. The speedup depends on the data and program characteristics, as well as the number of cores used. Usually, we can continue scaling to many more cores as long as the data is large enough.

Note how we included timers inside the Bodo function. This avoids measuring compilation time since Bodo compiles each `jit` function the first time it is called. Not measuring compilation time in benchmarking is usually important since:

1. Compilation time is often not significant for large computations in real settings but simple benchmarks are designed to run quickly
2. Functions can potentially be compiled and cached ahead of execution time
3. Compilation happens only once but the same function may be called multiple times, leading to inconsistent measurements

### Pandas User-Defined Functions
User-defined functions (UDFs) offer significant flexibility but have high overhead in Pandas. Bodo can accelerate UDFs significantly, allowing flexibility without performance overheads. Let's modify our example to use UDFs and measure  performance again:

In [16]:
def test():
    df = pd.read_parquet("example1.pq")
    t0 = time.time()
    df2 = df.groupby("A")["B"].agg((lambda a: (a==1).sum(), lambda a: (a==2).sum(), lambda a: (a==3).sum()))
    m = df2.mean()
    print("Compute time:", time.time() - t0, "secs")
    return m

result = test()

Compute time: 3.4350690841674805 secs


Running this example with Bodo is significantly faster, even on a single core:

In [19]:
import bodo

@bodo.jit
def test():
    df = pd.read_parquet("example1.pq")
    t0 = time.time()
    df2 = df.groupby("A")["B"].agg((lambda a: (a==1).sum(), lambda a: (a==2).sum(), lambda a: (a==3).sum()))
    m = df2.mean()
    print("Compute time:", time.time() - t0, "secs")
    return m

result = test()

Compute time: 0.7699737200746313 secs


Bodo's parallelism improves performance further:

In [17]:
%%px --block

@bodo.jit
def test():
    df = pd.read_parquet("example1.pq")
    t0 = time.time()
    df2 = df.groupby("A")["B"].agg((lambda a: (a==1).sum(), lambda a: (a==2).sum(), lambda a: (a==3).sum()))
    m = df2.mean()
    print("Compute time:", time.time() - t0, "secs")
    return m

result = test()

[stdout:0] Compute time: 0.2205293399747461 secs


### Memory Optimizations in Bodo
Bodo also improves performance by eliminating intermediate array values in computations such as expressions in Pandas and Numpy. The Monte Carlo Pi Estimation example demonstrates this effect:

In [20]:
import numpy as np

def calc_pi(n):
    t1 = time.time()
    x = 2 * np.random.ranf(n) - 1
    y = 2 * np.random.ranf(n) - 1
    pi = 4 * np.sum(x**2 + y**2 < 1) / n
    print("Execution time:", time.time()-t1, "\nresult:", pi)

calc_pi(2 * 10**8)

Execution time: 9.244210243225098 
result: 3.14168272


Bodo is faster even on a single core since it avoids creating arrays alltogether:

In [21]:
@bodo.jit
def calc_pi(n):
    t1 = time.time()
    x = 2 * np.random.ranf(n) - 1
    y = 2 * np.random.ranf(n) - 1
    pi = 4 * np.sum(x**2 + y**2 < 1) / n
    print("Execution time:", time.time()-t1, "\nresult:", pi)

calc_pi(2 * 10**8)

Execution time: 2.1929816810879856 
result: 3.14158668


Data-parallel array computations typically scale well too:

In [22]:
%%px --block
import numpy as np

@bodo.jit
def calc_pi(n):
    t1 = time.time()
    x = 2 * np.random.ranf(n) - 1
    y = 2 * np.random.ranf(n) - 1
    pi = 4 * np.sum(x**2 + y**2 < 1) / n
    print("Execution time:", time.time()-t1, "\nresult:", pi)

calc_pi(2 * 10**8)

[stdout:0] 
Execution time: 0.5737681119935587 
result: 3.14162074


## Unsupported Pandas/Python Features
### Supported Pandas Operations

Bodo supports a large subset of Pandas APIs as listed [here](http://docs.bodo.ai/latest/source/pandas.html). Moreover, dataframe schemas (column names and types) should be stable in operations. For example, key column names to `group` have to be constant for output type to be stable. This example demonstrates the issue:


In [25]:
import bodo

@bodo.jit
def f(a, i):
    column_list = a[:i]  # some computation that cannot be inferred statically
    df = pd.DataFrame({"A": [1, 2, 1], "B": [4, 5, 6]})
    return df.groupby(column_list).sum()

a = ["A", "B"]
i = 1
f(a, i)

BodoError: groupby(): 'by' parameter only supports a constant column label or column labels.

File "<ipython-input-25-835749d04e4e>", line 5:
def f(a, i):
    <source elided>
    df = pd.DataFrame({"A": [1, 2, 1], "B": [4, 5, 6]})
    return df.groupby(column_list).sum()
    ^


The code can most often be refactored to compute the key list in regular Python and pass as argument to Bodo:

In [26]:
@bodo.jit
def f(column_list):
    df = pd.DataFrame({"A": [1, 2, 1], "B": [4, 5, 6]})
    return df.groupby(column_list).sum()

a = ["A", "B"]
i = 1
column_list = a[:i]
f(column_list)

  "information.".format(self.func_ir.func_id.func_name)


Unnamed: 0_level_0,B
A,Unnamed: 1_level_1
1,10
2,5


### Supported Python Operations

Bodo relies on Numba for supporting basic Python features. Therefore, Python constructs that are not supported by Numba (see Numba documentation [here](http://numba.pydata.org/numba-doc/latest/reference/pysupported.html)) should be avoided in Bodo programs. For example:

- exceptions: `try` .. `except`, `raise`
- context manager: `with`
- `list`, `set`, `dict` and `generator` comprehensions
- `async` features
- class definition: `class`
- string formatting, e.g. “A: {}”.format(a)
- List containing values of heterogeneous type
  * myList = [1, "a", 0.1]
- Dictionary containing values of heterogeneous type
  * myDict = {"A": 1, "B": "a", "C": 0.1}

### Parallel Data Structures

Bodo can parallelize Pandas DataFrame and Series data structures, as well as Numpy arrays. However, collections like lists, sets and dictionaries cannot be parallelized yet.