<img src="img/logo.svg" align="right" style="background:black;height:50px; padding-left:5px;padding-right:5px;padding-bottom:0px;">

# Distributed

According to [Bodo documentation](https://docs.bodo.ai/latest/source/getting_started.html), 
*Bodo can scale your analytics code to thousands of cores, providing orders of magnitude speed up depending on program characteristics*. 
To accomplish that, **bodo** uses [MPI](https://en.wikipedia.org/wiki/Message_Passing_Interface)
under the hood. In order oo used that, you need to:

1. decorate your function with `bodo.jit`, and
2. if you want to run a python script, use `mpiexec` or,
3. if you are using `JupyterLab`, use [%%px magic](https://ipyparallel.readthedocs.io/en/latest/magics.html).

As we are working here inside the **JupyterLab**, we will need to prepare the `%%px` magic:

In [1]:
import ipyparallel as ipp
c = ipp.Client(profile="mpi")
view = c[:]
view.activate()
view.block = True
import os
view["cwd"] = os.getcwd()
%px cd $cwd

[stdout:0] /home/xmn/dev/bodoai/bodo-benchmarks/notebooks
[stdout:1] /home/xmn/dev/bodoai/bodo-benchmarks/notebooks
[stdout:2] /home/xmn/dev/bodoai/bodo-benchmarks/notebooks
[stdout:3] /home/xmn/dev/bodoai/bodo-benchmarks/notebooks


Now, lets create some dataset to test the parallelization with **bodo**.

In [2]:
import pandas as pd
import numpy as np
import bodo

In [3]:
NUM_GROUPS = 30
NUM_ROWS = 20_000_000
df = pd.DataFrame({
    "A": np.arange(NUM_ROWS) % NUM_GROUPS,
    "B": np.arange(NUM_ROWS)
})
df.to_parquet("data/example1.pq")
print(df)

           A         B
0          0         0
1          1         1
2          2         2
3          3         3
4          4         4
...       ..       ...
19999995  15  19999995
19999996  16  19999996
19999997  17  19999997
19999998  18  19999998
19999999  19  19999999

[20000000 rows x 2 columns]


Now, run the following code to see its behavior.

In [4]:
%%px
import bodo
import pandas as pd

@bodo.jit
def test():
    df = pd.read_parquet("data/example1.pq")
    df2 = df.groupby("A").sum()
    m = df2.B.max()
    return m

m = test()
print(m)

[stdout:0] 6666676000003
[stdout:1] 6666676000003
[stdout:2] 6666676000003
[stdout:3] 6666676000003


For this tutorial, we will use the same dataset used by the 
[dask tutorial](https://github.com/dask/dask-tutorial/)
the code for the data preparation can be accessed at
https://github.com/dask/dask-tutorial/blob/master/prep.py 
and it is under the license https://github.com/dask/dask-tutorial/blob/master/LICENSE.txt

In [5]:
# data preparation
from glob import glob
import os
import time
import urllib.request
import tarfile

import pandas as pd


def flights(small=None):
    start = time.time()
    
    data_dir = './data/'
    flights_url = "https://storage.googleapis.com/dask-tutorial-data/nycflights.tar.gz"
    
    flights_raw = os.path.join(data_dir, 'nycflights.tar.gz')
    flightdir = os.path.join(data_dir, 'nycflights')
    jsondir = os.path.join(data_dir, 'flightjson')
    
    if small is None:
        small = bool(os.environ.get("DASK_TUTORIAL_SMALL", False))

    if small:
        N = 500
    else:
        N = 10_000

    if not os.path.exists(flights_raw):
        print("- Downloading NYC Flights dataset... ", end='', flush=True)
        url = flights_url
        urllib.request.urlretrieve(url, flights_raw)
        print("done", flush=True)

    if not os.path.exists(flightdir):
        print("- Extracting flight data... ", end='', flush=True)
        tar_path = os.path.join(data_dir, 'nycflights.tar.gz')
        with tarfile.open(tar_path, mode='r:gz') as flights:
            flights.extractall('data/')

        if small:
            for path in glob(os.path.join(data_dir, "nycflights", "*.csv")):
                with open(path, 'r') as f:
                    lines = f.readlines()[:1000]

                with open(path, 'w') as f:
                    f.writelines(lines)

        print("done", flush=True)

    if not os.path.exists(jsondir):
        print("- Creating json data... ", end='', flush=True)
        os.mkdir(jsondir)
        for path in glob(os.path.join(data_dir, 'nycflights', '*.csv')):
            prefix = os.path.splitext(os.path.basename(path))[0]
            df = pd.read_csv(path, nrows=N)
            df.to_json(os.path.join(data_dir, 'flightjson', prefix + '.json'),
                       orient='records', lines=True)
        print("done", flush=True)
    else:
        return

    end = time.time()
    print("** Created flights dataset! in {:0.2f}s**".format(end - start))
    
    
flights()

You can read **CSV** files using **bodo** in a similar way you would do using **dask** 
or **pandas**, actually you will use the pandas command `read_csv` inside a `bodo.jit`
function:

In [6]:
import bodo
import pandas as pd
import numpy as np

@bodo.jit
def read_flight_csv():
    return pd.read_csv(
        'data/nycflights/',
        parse_dates={'Date': [0, 1, 2]},
        # it needs all fields here
        dtype={
            'Year': np.int16,
            'Month': np.int8,
            'DayofMonth': np.int8,
            'DayOfWeek': np.int8,
            'DepTime': np.float32,
            'CRSDepTime': np.float32,
            'ArrTime': np.float32,
            'CRSArrTime': np.float32,
            'UniqueCarrier': str,
            'FlightNum': np.int16,
            'TailNum': str,
            'ActualElapsedTime': np.float32,
            'CRSElapsedTime': np.float32,
            'AirTime': np.float32,
            'ArrDelay': np.float32,
            'DepDelay': np.float32,
            'Origin': str,
            'Dest': str,
            'Distance': np.float32,
            'TaxiIn': np.float32,
            'TaxiOut': np.float32,
            'Cancelled': np.bool_,
            'Diverted': np.bool_,
        }
    )

t0 = time.time()
df = read_flight_csv()
t1 = time.time()

print(t1 - t0, 's')

df.head()



7.5921385288238525 s


Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
0,1990,1,1,1,1621.0,1540.0,1747.0,1701.0,US,33,...,,46.0,41.0,EWR,PIT,319.0,,,False,False
1,1990,1,2,2,1547.0,1540.0,1700.0,1701.0,US,33,...,,-1.0,7.0,EWR,PIT,319.0,,,False,False
2,1990,1,3,3,1546.0,1540.0,1710.0,1701.0,US,33,...,,9.0,6.0,EWR,PIT,319.0,,,False,False
3,1990,1,4,4,1542.0,1540.0,1710.0,1701.0,US,33,...,,9.0,2.0,EWR,PIT,319.0,,,False,False
4,1990,1,5,5,1549.0,1540.0,1706.0,1701.0,US,33,...,,5.0,9.0,EWR,PIT,319.0,,,False,False


You can also run it using `%%px` command: 

In [7]:
%%px
import time

import bodo
import pandas as pd
import numpy as np

@bodo.jit
def read_flight_csv():
    return pd.read_csv(
        'data/nycflights/',
        parse_dates={'Date': [0, 1, 2]},
        # it needs all fields here
        dtype={
            'Year': np.int16,
            'Month': np.int8,
            'DayofMonth': np.int8,
            'DayOfWeek': np.int8,
            'DepTime': np.float32,
            'CRSDepTime': np.float32,
            'ArrTime': np.float32,
            'CRSArrTime': np.float32,
            'UniqueCarrier': str,
            'FlightNum': np.int16,
            'TailNum': str,
            'ActualElapsedTime': np.float32,
            'CRSElapsedTime': np.float32,
            'AirTime': np.float32,
            'ArrDelay': np.float32,
            'DepDelay': np.float32,
            'Origin': str,
            'Dest': str,
            'Distance': np.float32,
            'TaxiIn': np.float32,
            'TaxiOut': np.float32,
            'Cancelled': np.bool_,
            'Diverted': np.bool_,
        }
    )

t0 = time.time()
df = read_flight_csv()
t1 = time.time()

print(t1 - t0, 's')

[stdout:0] 7.619726657867432 s
[stdout:1] 7.535857915878296 s
[stdout:2] 7.503788232803345 s
[stdout:3] 7.51956582069397 s


[stderr:0] 


As you can see, it will result in 4 outputs, one for each process.

Now, lets try to calculate some interesting information. We will get maximum 
average non-cancelled delay grouped by Airport.

In [8]:
@bodo.jit
def get_largest_delay(df):
    # Maximum average non-cancelled delay grouped by Airport
    return df[~df.Cancelled].groupby('Origin').DepDelay.mean().max()


t0 = time.time()
largest_delay = get_largest_delay(df)
t1 = time.time()

print(t1 - t0, 's')



10.188231229782104 s


And we can do same thing but using `%%px`: 

In [9]:
%%px

@bodo.jit
def get_largest_delay(df):
    # Maximum average non-cancelled delay grouped by Airport
    return df[~df.Cancelled].groupby('Origin').DepDelay.mean().max()


t0 = time.time()
largest_delay = get_largest_delay(df)
t1 = time.time()

print(t1 - t0, 's')

[stdout:0] 3.4489574432373047 s
[stdout:1] 3.4456801414489746 s
[stdout:2] 3.4475417137145996 s
[stdout:3] 3.4464614391326904 s


[stderr:0] 


For the next examples, we will use just the `%%px` approach.

### Some Questions to Consider:

- How much speedup is possible using **bodo** in a single core.
- Given how many cores are on this machine, how much faster could the parallel processing be than the single-threaded one.
- Could it be faster using eager compilation approach?

For the first time calling a `jitted` function, it will compile the function for the parameters passed.
So, it will take an extra time for the compilation. For the second time calling the function, as it is already 
compiled, it should be faster than before. One way to avoid this extra time when calling a function for the first
time is to compile it when you are defining your function. This is also know as **eager compilation**. For example:

In [10]:
%%px

import bodo

@bodo.jit('int64(int64,int64)')
def calc_add(a: int, b: int) -> int:
    return a + b


calc_add(1, 2)

[0;31mOut[0:17]: [0m3

[0;31mOut[1:17]: [0m3

[0;31mOut[2:17]: [0m3

[0;31mOut[3:17]: [0m3

As you can see, you can pass to `bodo.jit` decorator the information about the parameters and return types. 
In this way, the `jit` decorator will compile the function for that specific signature.

## Making a cluster

### IPython parallel

As you can observe in the initial of this document, if you want to use `%%px` 
to process your code in parallel (that uses **MPI** under the hood), you need 
first to define a **IPython Parallel Client** and a profile (here we are using 
a profile calle **mpi**).

At this point, if you followed first the 
[Introductory notebook](https://quansight.github.io/bodo-benchmarks/00_introduction.html)
you should have already the environment working. But it is important to understand
what is happening here.

First, you need to create the **IPython** profile. In a terminal, run the following commands:

```sh
ipython profile create --parallel --profile=mpi
```

Now, you need to start your clusters for the profile you just created:

```sh
ipcluster start -n 4 --profile=mpi &
```

The clusters are now available, and you can connect from the **Jupyter notebook** using
`ipyparallel`:

In [11]:
import ipyparallel as ipp

c = ipp.Client(profile="mpi")
view = c[:]
view.activate()
view.block = True

import os

view["cwd"] = os.getcwd()
%px cd $cwd

[stdout:0] /home/xmn/dev/bodoai/bodo-benchmarks/notebooks
[stdout:1] /home/xmn/dev/bodoai/bodo-benchmarks/notebooks
[stdout:2] /home/xmn/dev/bodoai/bodo-benchmarks/notebooks
[stdout:3] /home/xmn/dev/bodoai/bodo-benchmarks/notebooks


For more informantion about how to use **ipyparallel** with **MPI**, please check the 
[documentation page](https://ipyparallel.readthedocs.io/en/latest/mpi.html)

## References

This notebook was inpired by
[Dask tutorials](https://github.com/dask/dask-tutorial/) and 
[Bodo documentation](https://docs.bodo.ai/latest/source/getting_started.html).