# GTC 2018 Numba Tutorial Notebook 7: GPU DataFrames

## What is a DataFrame?

So far, we've been using Numba to operate on objects that behave like NumPy arrays: n-dimensional where every element has the same type.  (Note: The element type can be a struct.)  For many applications, like waveforms, images, lists of numbers, etc, it is very natural to create n-dimensional arrays.

However, another data model is also very common in data science.  The DataFrame is a 2-dimensional table, where each row represents a record and each column can have a different data type:

| time | sensor_id | temp | humidity | flags |
|------|-----------|------|----------|-------|
| 1    | A         | 28   | 50       | 0     |
| 4    | B         | 25   | 51       | 0     |
| 8    | A         | 27   | 49       | 1     |

The DataFrame is very similar to the data returned by relational databases.  DataFrames are stored such that all the data in each column is contiguous in memory, which allows for much faster columnar processing.

In Python, the standard library for working with DataFrames on the CPU is Pandas.  Last year, as part of the [GPU Open Analytics Initiative](http://gpuopenanalytics.com/), the Numba developers began implementing a GPU DataFrame library, called `PyGDF`, inspired by the Pandas API and based on the [Apache Arrow](https://arrow.apache.org/) project.

In this notebook, we'll introduce the capabilities of PyGDF, and show how your knowledge of CUDA Python will allow you to implement some powerful data transformations that run directly on the GPU.

**WARNING**: PyGDF is still very new and experimental.  It is continually being improved and optimized, so if you encounter issues, let us know at: https://github.com/gpuopenanalytics/pygdf

## Loading Data

Currently, loading and parsing data from disk still has to take place on the CPU.  Let's load a CSV file of sensor data with Pandas:

In [2]:
import numpy as np
import pandas as pd

df = pd.read_csv('sensor.csv', dtype=dict(flags=int))
df.dtypes

time         float64
sensor_id     object
temp         float64
humidity     float64
flags          int64
dtype: object

In [3]:
df.head()

Unnamed: 0,time,sensor_id,temp,humidity,flags
0,0.0,B,22.498096,0.465723,0
1,0.001,C,21.693834,0.475885,0
2,0.002,B,21.130495,0.483004,0
3,0.003,B,20.719901,0.488192,0
4,0.004,C,20.484617,0.491165,0


The format of this (not real) data is:

  * time - Time since start of recording (s)
  * sensor_id - Name of temperature/humidity sensor
  * temp - Temperature (C)
  * humidity - Relative humidity (0-1)
  * flags - flag is set to 1 if motion sensor records activity, 0 otherwise
  
To load this data onto the GPU, we need to recode the `sensor_id` column, which is currently a string.  PyGDF does not support strings on the GPU, but fortunately we can rewrite this column as a categorical.  Categorical columns map each unique value to an integer for faster processing:

In [4]:
df['sensor_id'] = df['sensor_id'].astype('category')
df['sensor_id'].dtype

CategoricalDtype(categories=['A', 'B', 'C'], ordered=False)

Now let's load this data onto the GPU:

In [5]:
import pygdf

gdf = pygdf.DataFrame.from_pandas(df)
gdf.dtypes

time          float64
sensor_id    category
temp          float64
humidity      float64
flags           int64
dtype: object

If we want to look at some part of the DataFrame on the CPU, we can call the `.to_pandas()` method to copy the data back to a regular Pandas DataFrame.  This code eshows us the first 5 rows on the GPU:

In [6]:
gdf.head().to_pandas()

Unnamed: 0,time,sensor_id,temp,humidity,flags
0,0.0,B,22.498096,0.465723,0
1,0.001,C,21.693834,0.475885,0
2,0.002,B,21.130495,0.483004,0
3,0.003,B,20.719901,0.488192,0
4,0.004,C,20.484617,0.491165,0


With suitable plotting tools [Holoviews + Datashader](http://holoviews.org/user_guide/Large_Data.html) we could plot all 1 million points in the notebook, but for simplicity we've included the plot images below:

<img src="img/sensor_temp.png" alt="temperature" style="width: 250px; display: inline"/><img src="img/sensor_humidity.png" alt="humidity" style="width: 250px; display: inline"/>

We can see glitches in the temperature data (which we will assume are due to sensor errors), which we will clean up later in this tutorial.

## Column Transformations

One of the most basic GDF operations is a column transform.  We can perform built-in arithmetic operations on each column, such as type casting:

In [7]:
gdf['temp'] = gdf['temp'].astype(np.float32)
gdf['humidity'] = gdf['humidity'].astype(np.float32)
gdf['flags'] = gdf['flags'].astype(np.int32)
gdf.dtypes

time          float64
sensor_id    category
temp          float32
humidity      float32
flags           int32
dtype: object

or column arithmetic:

In [8]:
gdf['temp_f'] = gdf['temp'] * 9/5 + 32
gdf.head().to_pandas()

Unnamed: 0,time,sensor_id,temp,humidity,flags,temp_f
0,0.0,B,22.498096,0.465723,0,72.496574
1,0.001,C,21.693834,0.475885,0,71.048904
2,0.002,B,21.130495,0.483004,0,70.034889
3,0.003,B,20.719902,0.488192,0,69.295822
4,0.004,C,20.484617,0.491165,0,68.872314


But for non-trivial expressions, we should use the `apply_rows` function to provide a custom function that loops over all rows.  This function looks like writing serial code, but it will be translated by PyGDF (using Numba) into properly strided, parallel GPU code.  It is important that this function only operate on one row at a time, or the translation will not be correct.

In [9]:
def to_fahrenheit(temp, temp_f_int):
    for i, t in enumerate(temp):
        temp_f_int[i] = 9/5 * t + 32.0

The `.apply_rows()` function takes the serial function, the names of the input columns, a dictionary of output columns and target data types, and optionally a dictionary of extra, non-GDF arguments (not used here):

In [10]:
gdf = gdf.apply_rows(to_fahrenheit,
               incols=['temp'],
               outcols=dict(temp_f_int=np.int32),
               kwargs={})
gdf.head().to_pandas()

Unnamed: 0,time,sensor_id,temp,humidity,flags,temp_f,temp_f_int
0,0.0,B,22.498096,0.465723,0,72.496574,72
1,0.001,C,21.693834,0.475885,0,71.048904,71
2,0.002,B,21.130495,0.483004,0,70.034889,70
3,0.003,B,20.719902,0.488192,0,69.295822,69
4,0.004,C,20.484617,0.491165,0,68.872314,68


## Filtering

Filtering can be done with the `query()` method, which takes an expression string of column names.  Let's remove all the rows where the flags (indicating motion near the sensor) are zero.

In [11]:
print('Original len:', len(gdf))

reduced_gdf = gdf.query('flags == 0')
# don't need these columns anymore
del reduced_gdf['flags']  
del reduced_gdf['temp_f']
del reduced_gdf['temp_f_int']

print('Reduced len:', len(reduced_gdf))

Original len: 1000000
Reduced len: 999889


## Groupby

Probably one of the most effective uses of the GPU DataFrame is for grouping operations followed by transformations or reductions.

### Groupby Reductions

The simplest usage of groupby is to apply a reduction to combine rows in the same group.

For this analysis, we are going to look at data in 5 second time bins.  First we need to compute a bin number based on the time column:

In [12]:
reduced_gdf['time_bin'] = (reduced_gdf['time'] / 5).astype(np.int32)

Now let's compute the mean in each bin for each sensor:

In [13]:
mean_gdf = reduced_gdf.groupby(by=['time_bin', 'sensor_id']).mean()
mean_gdf.head(10).to_pandas()  # the result of a groupby is another GPU DataFrame

Unnamed: 0,time_bin,sensor_id,time,temp,humidity
0,0,A,2.500937,21.752899,0.475139
1,0,B,2.479421,21.710459,0.475675
2,0,C,2.517933,21.684062,0.476009
3,1,A,7.571181,21.675455,0.476118
4,1,B,7.466885,21.654796,0.476379
5,1,C,7.46349,21.62459,0.47676
6,2,A,12.499554,21.450349,0.478962
7,2,B,12.507045,21.567419,0.477483
8,2,C,12.491396,21.479624,0.478592
9,3,A,17.502512,21.307641,0.480765


Sometimes, we want to apply a different reduction to each column and use more than one reductions of some columns.  Let's keep the earliest time in each bin, and get the mean and standard deviation of the temp and humidity columns:

In [19]:
binned_gdf = reduced_gdf.groupby(by=['time_bin', 'sensor_id']).agg({
        'time': 'min',
        'temp': ['mean', 'std'], 
        'humidity': ['mean', 'std']
    })

binned_gdf.head(20).to_pandas()

  var = asum / div - (mu ** 2) * n / div
  var = asum / div - (mu ** 2) * n / div


Unnamed: 0,time_bin,sensor_id,time,temp_mean,temp_std,humidity_mean,humidity_std
0,0,A,0.005,21.752899,1.665295,0.475139,0.021042
1,0,B,0.0,21.710459,1.657004,0.475675,0.020937
2,0,C,0.001,21.684062,1.612681,0.476009,0.020377
3,1,A,5.007005,21.675455,1.650455,0.476118,0.020854
4,1,B,5.002005,21.654796,1.638052,0.476379,0.020698
5,1,C,5.000005,21.62459,1.703364,0.47676,0.021523
6,2,A,10.00001,21.450349,1.678541,0.478962,0.021209
7,2,B,10.00101,21.567419,1.646382,0.477483,0.020803
8,2,C,10.00301,21.479624,1.638245,0.478592,0.0207
9,3,A,15.000015,21.307641,1.696614,0.480765,0.021438


### Group Transformations

Now that we've learned some CUDA programming, we can write a custom function that `.apply_grouped()` can execute on the GPU so that each CUDA block is given an entire group to operate on.  The function will be embedded inside a CUDA kernel that assigns each group in the dataset to a CUDA block, and then you can do any collective processing of the group with the standard CUDA thread techniques that were shown in the previous notebooks.

For this example, we're going to identify the "glitch" temperature readings by computing the mean and standard deviation of samples in the time bin (for each sensor) and then tagging as bad all records where the temperature is more than 5 standard deviations from the mean.

In [16]:
import math
from numba import cuda
from numba import float32

def transform_function(time_bin, sensor_id, temp, bad):
    # each of these arguments is a NumPy view including just the elements of the current group
    # time_bin.size == sensor_id.size == temp.size == bad.size
    # bad will be our output array flagging
    
    # we need some scratch space that all threads in this block can see
    
    # zeroth, first, and second moments
    moments = cuda.shared.array(3, float32)
    # mean (element 0) and standard deviation (element 1)
    mean_std = cuda.shared.array(2, float32)
    
    # Initialize the shared memory we're about to modify
    if cuda.threadIdx.x == 0:
        moments[0] = 0.0
        moments[1] = 0.0
        moments[2] = 0.0
    cuda.syncthreads()
    
    # First pass: Compute temperature moments in the group
    for i in range(cuda.threadIdx.x, temp.size, cuda.blockDim.x):
        cuda.atomic.add(moments, 0, 1)
        cuda.atomic.add(moments, 1, temp[i])
        cuda.atomic.add(moments, 2, temp[i]**2)
    cuda.syncthreads()

    # Now compute mean and standard deviation
    if cuda.threadIdx.x == 0:
        mean_std[0] = moments[1] / moments[0]
        mean_std[1] = math.sqrt(moments[2]/moments[0] - mean_std[0]**2)
    cuda.syncthreads()

    # Second pass: Tag bad temperatures
    for i in range(cuda.threadIdx.x, temp.size, cuda.blockDim.x):
        bad[i] = (abs(temp[i] - mean_std[0]) / mean_std[1]) > 5

# Execute the group transformation kernel
bad_tagged_gdf = reduced_gdf.groupby(by=['time_bin', 'sensor_id']).apply_grouped(
    transform_function,
    incols=['time_bin', 'sensor_id', 'temp'],
    outcols={'bad': np.int32},
    # Set the thread-per-block for the GPU kernel.
    # It's defaulted to 1 to emulate serial behavior.
    tpb=32,
)
bad_tagged_gdf.head(20).to_pandas() # most of these will be good values

Unnamed: 0,time_bin,sensor_id,time,temp,humidity,bad
0,0,A,0.005,22.335522,0.467777,0
1,0,A,0.007,20.628517,0.489346,0
2,0,A,0.012,21.175602,0.482434,0
3,0,A,0.013,22.518532,0.465465,0
4,0,A,0.017,21.654493,0.476383,0
5,0,A,0.018,19.024134,0.509619,0
6,0,A,0.021,20.450075,0.491601,0
7,0,A,0.024,20.917501,0.485695,0
8,0,A,0.025,22.370373,0.467337,0
9,0,A,0.027,22.19105,0.469603,0


If we want to see just the temperature samples that failed the test, we can select them out with `query()`:

In [23]:
bad_tagged_gdf.query('bad == 1').temp.nlargest(10).to_pandas()

604430    48.201454
878804    48.067596
702695    47.989872
888253    47.676571
733252    47.316669
609146    47.278095
842673    47.066006
523866    46.372456
608021    45.988064
843641    45.987778
dtype: float32

In [24]:
bad_tagged_gdf.query('bad == 0').temp.nlargest(10).to_pandas()

620991    39.570740
645698    38.800659
744413    38.731720
740898    38.565994
632257    38.479790
989006    38.455040
621341    38.163860
619041    38.061131
640830    37.938618
996812    37.926849
dtype: float32

And now we can compute our mean temperature using only the good measurements:

In [18]:
filtered_group = bad_tagged_gdf.query('bad == 0').groupby(['time_bin', 'sensor_id']).agg(dict(time='min', temp='mean'))
filtered_group.head(20).to_pandas()

Unnamed: 0,time_bin,sensor_id,time,temp
0,0,A,2.500937,21.752899
1,0,B,2.479421,21.710459
2,0,C,2.517933,21.684062
3,1,A,7.571181,21.675455
4,1,B,7.466885,21.654796
5,1,C,7.46349,21.62459
6,2,A,12.499554,21.450349
7,2,B,12.507045,21.567419
8,2,C,12.491396,21.479624
9,3,A,17.502512,21.307641
