# HPAT (High Performance Analytics Toolkit)

In this tutorial we'll cover how to use HPAT to accelerate and scale-out data analytics codes.

HPAT automatically parallelizes a subset of Python that is commonly used for data analytics and machine learning. It just-in-time-compiles the functions annotated with the @hpat.jit decorator. The decorated functions are replaced with generated parallel MPI binaries that run on bare metal. The supported data structures for large datasets are Numpy arrays and Pandas dataframes.

HPAT is based on numba, so we'll start with a quick intro to numba. We'll then dive into HPAT, how it deals with pandas data structures and operations like file-reading, filtering, aggregation etc.

## Numba
Let's start with a simple example to get familiar with just-in-time-compiling with numba: computing pi using Monte-Carlo Integration.

We use numpy arrays and their array-notation (no loop).

In [None]:
import numpy as np

def calc_pi(n):
    x = 2 * np.random.ranf(n) - 1
    y = 2 * np.random.ranf(n) - 1
    pi = 4 * np.sum(x**2 + y**2 < 1) / n
    return pi

%timeit calc_pi(2**22)

Numba is a just-in-time compiler. It can compile functions by inserting hooks through a decorator. When the function is finally called numba will attempt to compile the function to native code for the given input types.

In [None]:
import numba as nb
import numpy as np

@nb.jit
def calc_pi(n):
    x = 2 * np.random.ranf(n) - 1
    y = 2 * np.random.ranf(n) - 1
    pi = 4 * np.sum(x**2 + y**2 < 1) / n
    return pi

%timeit calc_pi(2**22)

Not much benefit; most ot the time is spent in numpy which implemented in C anyway.

## Parallel Accelerator
Numba now has a feature to thread-parallelize data-parallel operations.
* __<span style="color:red">In the above cell, try JIT/decorator parameter parallel=True</span>__

### Parallel for/range for explicit (maybe higher-level) parallelsim
For loops can be marked exiplicitly parallel by replacing range with prange and with parallel=True. Reduction variables are automatically recognized. Race-freedom however must be guaranteed by the programmer.

Let's write the same program with an explicit loop.

In [None]:
import numba as nb
import numpy as np

def calc_pi(n):
    x = 2 * np.random.ranf(n) - 1
    y = 2 * np.random.ranf(n) - 1
    s = 0
    for i in range(n):
        s += x[i]**2 + y[i]**2 < 1
    pi = 4 * s / n
    return pi

%timeit calc_pi(2**22)

* While waiting for the result: __<span style="color:red">In the above cell use numba with parallel=True and replace range with nb.prange</span>__. Compared to @nb.jit the parallel option should give you speedup correlating to the number cores in your system.

## Types
numba understand some basic data types like integers, floats, lists and most importantly numpy arrays.
It cannot compile arbirtrary types, though.

We could write the same using pandas data-frames.

In [None]:
import numba as nb
import pandas as pd
import numpy as np

@nb.jit
def calc_pi(n):
    xy = pd.DataFrame({'x': 2 * np.random.ranf(n) - 1,
                       'y': 2 * np.random.ranf(n) - 1})
    pi = 4 * xy[xy.x**2 + xy.y**2 < 1].x.count() / n
    return pi

%timeit calc_pi(2**22)

Using numba with the above will not lead to any performance benefit: it does not compile to native code.

* __<span style="color:red">Try forcing no-python mode by using nopython=True or decorator nb.njit.</span>__

## HPAT
HPAT advances numba in two dimensions:
1. Support for pandas dataframes and operations
2. Scaling out to a cluster using MPI

Let's start with the same pi example but use HPAT's jit. Notice that in contrast to numba, hpat.jit defaults to parallel=True and nopython=True.

### Using Pandas
HPAT knows how to handle pandas!

In [None]:
import hpat
import pandas as pd
import numpy as np

@hpat.jit
def calc_pi(n):
    xy = pd.DataFrame({'x': 2 * np.random.ranf(n) - 1,
                       'y': 2 * np.random.ranf(n) - 1})
    pi = 4 * xy[xy.x**2 + xy.y**2 < 1].x.count() / n
    return pi

%timeit calc_pi(2**22)

### Parallelism, Multi-Processing, MPI
HPAT know how to parallelize and distribute pandas operations across a cluster. It's using MPI to achieve close-to-native efficienty.

Currently running HPAT on multiple processes is not working yet. We need to save our code to a file and start it using mpirun in a (external) shell.

In [None]:
import hpat
import pandas as pd
import numpy as np
from time import process_time as clock

@hpat.jit
def calc_pi(n):
    xy = pd.DataFrame({'x': 2 * np.random.ranf(n) - 1,
                       'y': 2 * np.random.ranf(n) - 1})
    pi = 4 * xy[xy.x**2 + xy.y**2 < 1].x.count() / n
    return pi

# warmup call, we do not want to time the compilation time
pi = calc_pi(2**22)

n = 5

t1 = clock()
for i in range(5):
    pi = calc_pi(2**22)
t2 = clock()
print('pi={}'.format(pi))

print('{:.2f} ms per loop'.format((t2-t1)*1000.0/n))

In [None]:
# Make sure you provide the right cell number as the last argument!
%save -f runme.py ??
!mpirun -n 4 python ./runme.py

## File Input
<img style="float: right;" src="img/file-read.jpg">
Efficient parallel data processing requries data gathering to be parallel as well. HPAT provides parallel and distributed file-reading for different file formats. This tutorial will cover CSV and parquet; HDF5 is supported as well but left to the reader to expore.

### Parquet
Parquet is a convenient file-format because it not only stores the data but also meta information like data-types.
Let's read a file using pandas.


In [None]:
import pandas as pd
d = pd.read_parquet('cycling_dataset.pq')
d.head()

* __<span style="color:red">Now put this into a function, @hpat.jit-compile and call it</span>__

We've successfully read a file and returned the number of rows in it.
We'll investigate the distribution/parallelism in a bit. Hold back for now.

### CSV
Let's read the same data from a CSV file. We'll see that type-information is required - we want to compile to native machine code!

In [None]:
import pandas as pd
import hpat

@hpat.jit
def read_csv():
    return len(pd.read_csv('cycling_dataset.csv'))

d = read_csv()

We need to provide column names and their types! We will also add a call to hpat.distribution_report() to see how HPAT actually distributes/parallelises the data.

In [None]:
import pandas as pd
import hpat
import numpy as np

@hpat.jit
def read_csv():
    colnames = ['altitude', 'cadence', 'distance', 'hr', 'latitude', 'longitude', 'power', 'speed', 'time']
    coltypes = {'altitude': np.float64,
                'cadence': np.float64,
                'distance': np.float64,
                'hr': np.float64,
                'latitude': np.float64,
                'longitude': np.float64,
                'power': np.float64,
                'speed': np.float64,
                'time': str}
    return pd.read_csv('cycling_dataset.csv', names=colnames, dtype=coltypes, skiprows=1)

d = read_csv()
hpat.distribution_report()

The distribution report tells us that all data is actually replicated, e.g. there is no parallelism. The reason for this is that by default any input and output to a HPAT-jit'ed function is replicated on all processes.

Try the following (in the above cell)

* __<span style="color:red">Removing the return statement will eliminate the entire file read!</span>__
* __<span style="color:red">If we return the shape of the dataframe the data will be block partitioned along the first axis. This is because we actually make use of the data but we do not return an entire column/data-frame.</span>__
* __<span style="color:red">You can now make the file-name a parameter to the function because we specify column-names and types. Try it!</span>__
* __<span style="color:red">Try with MPI!</span>__


In [None]:
%save -f runme.py ??
!mpirun -n 4 python ./runme.py

<img style="float: right;" src="img/data-parallel.jpg">

## Simple Data-Parallel Operations
Many operations in pandas are trivially data-parallel. There is no communication needed to compute them in parallel.
Examples are filtering, combining columns, normlization, dropping columns/rows etc.

Let's drop some rows and some columns. Additionally, we create a new column by extracting the month from the time column.

In [None]:
import pandas as pd
import hpat
from datetime import datetime

@hpat.jit
def data_par():
    df = pd.read_parquet('cycling_dataset.pq')
    df = df[df.power!=0]
    df['month'] = df.time.dt.month
    df = df.drop(['latitude', 'longitude', 'power', 'time'], axis=1)
    return len(df)

print(data_par())
hpat.distribution_report()

* __<span style="color:red">Try with MPI!</span>__

In [None]:
%save -f runme.py ??
!mpirun -n 4 python ./runme.py

<img style="float: right;" src="img/reduction.jpg">

## Reduction operations
Reductions, such as avg, sum, mean etc, are mapped to efficient MPI code. Their result gets replicated on all processes.

As an example let's compute the mean of the 'power' column.

In [None]:
import pandas as pd
import hpat

@hpat.jit
def mean_power():
    df = pd.read_parquet('cycling_dataset.pq')
    return df.power.mean()

print(mean_power())
hpat.distribution_report()

In [None]:
%save -f runme.py ??
!mpirun -n 4 python ./runme.py

* __<span style="color:red">Try other reductions such as sum()</span>__

<img style="float: right;" src="img/groupby.jpg">

## GroupBy/Aggregation
More challenging for parallel and distributed environments are grouping operations which are typically followed by aggregations/reductions. Like with simple reductions HPAT maps groupby and aggregations to efficient MPI code.

Let's compute the average power output per hour. WE return the number of means to avoid REP propagation.

In [None]:
import pandas as pd
import hpat

@hpat.jit
def mean_power_pm():
    df = pd.read_parquet('cycling_dataset.pq')
    df['hour'] = df.time.dt.hour
    grp = df.groupby('hour')
    mean = grp['power'].mean()
    return len(mean)

print(mean_power_pm())
hpat.distribution_report()

In [None]:
%save -f runme.py ??
!mpirun -n 4 python ./runme.py

* __<span style="color:red">Try other aggregation/groupby</span>__

<img style="float: right;" src="img/rolling.jpg">

## Sliding Windows
Some popular operations, in particular for time-series analysis, are based on sliding windows, like computing the moving average or percentage change. In a distributed setup these require communication beyond map-reduce. Again, HPAT maps this to efficient patterns known from HPC.

Let's compute the moving average of the heart-rate.

In [None]:
import pandas as pd
import hpat

@hpat.jit
def mov_avg():
    df = pd.read_parquet('cycling_dataset.pq')
    mv_av = df.hr.rolling(4).mean()
    return mv_av.mean()

print(mov_avg())
hpat.distribution_report()

In [None]:
%save -f runme.py ??
!mpirun -n 4 python ./runme.py

## Join
HPAT can also efficiently join dataframes.

Let's read data, split into 2 dataframes and re-join on time column.

In [None]:
import pandas as pd
import numpy as np
import hpat

@hpat.jit
def merge_dfs():
    df = pd.read_parquet('cycling_dataset.pq')
    df1 = df[['altitude', 'cadence', 'distance', 'hr', 'time']]
    df2 = df[['latitude', 'longitude', 'power', 'speed', 'time']]
    df3 = df1.merge(df2, on='time')
    return len(df3)

print(merge_dfs())
hpat.distribution_report()

In [None]:
%save -f runme.py ??
!mpirun -n 4 python ./runme.py

# Using daal4py with HPAT
Often data analytics involve some machine learning steps like regression, clustering, classification... 
We are developing a compatibility package in daal4py which let's use use daal4py in a hpat.jit'ed function.

The following code trains a model to predict the power output. daal4py's linear regression supports training on distributed data-sets. We'll use HPAT to distribute the data, preprocess it and then it will hand it over to daal4py which will train a single linear model with the distributed data set.

In [None]:
import numpy as np
import hpat
import daal4py as d4p
import daal4py.hpat
import pandas as pd

@hpat.jit
def train():
    # Read training data
    train_set = pd.read_parquet('cycling_train_dataset.pq')
    # Remove entries where power==0
    train_set = train_set[train_set.power!=0]
    # Reduce the dataset, create X.  We drop the target, and other non-essential features.
    reduced_dataset = train_set.drop(['time','power','latitude','longitude'], axis=1)
    # Get the target, create Y as an 2d array of float64
    target = train_set.power.values.reshape(len(train_set),1).astype(np.float64)
    
    # Create a daal4py linear regression algorithm object
    d4p_lm = d4p.linear_regression_training(interceptFlag=True)
    # Train the model
    lm_trained = d4p_lm.compute(reduced_dataset.values, target)

    # Finally return the result
    return lm_trained

train_result = train()
print(train_result)

In [None]:
%save -f runme.py ??
!mpirun -n 4 python ./runme.py

The full daal4py+HPAT example is provided in a separate notebook (daal4py_data_science.ipynb) and includes the distributed prediction step as well.

## Distribution Annotations
Like numba, HPAT accepts typing annotation to the jit decorator. In addition, you can tell HPAT to accept data as already partitioned and distributed and/or return data distributedly, e.g. ungathered. This can be very useful if you want to orchestrate several re-usable functions in a truely SPMD fashion.

We saw earlier that returning a data-frame or series will replicate the data-frame on all process and also replacate all related computation. It essentially prohibits parallel/distributed execution. We can tell HPAT to not gather the returned data and instead just return the local portions on each process.

In [None]:
import pandas as pd
import hpat

@hpat.jit(distributed=['df'])
def read_pq():
    df = pd.read_parquet('cycling_dataset.pq')
    return df

df = read_pq()
print(df.shape)
hpat.distribution_report()

In [None]:
%save -f runme.py ??
!mpirun -n 4 python ./runme.py

# Intraday-stock-mean-reversion-trading-backtest
A nice backtest from http://www.pythonforfinance.net/2017/02/20/intraday-stock-mean-reversion-trading-backtest-in-python/ works nicely with HPAT. The full code use many of the above features and is attached (intraday_mean.py) to the tutorial. Enjoy!
