# GPU-accelerated Data Science Workflow

## Human Activity Recognition using GPU DataFrame and GPU KMeans

Analyzing smart phone sensors to determine the activity the person is engaged in.


- Activities: biking, sitting, standing, walking, stairup, stairdown.
- Sensors: Accelerometer and Gyroscope.
- Sampling rate: highest frequency the respective device allows.

Link to the dataset: http://archive.ics.uci.edu/ml/datasets/Heterogeneity+Activity+Recognition


Our approach uses KMeans from the `h2o4gpu` package to form the initial clusters.  Then, we use nearest neighbour to classify the clusters; i.e. the intra-cluster dominating class determines the class for the cluster.  During the classification, we choose the class of the closest cluster center.

The data is preprocessed with `pygdf` DataFrame.  Initially, each row in the dataset is a single event.  Our preprocessing bins the event into frames and transpose the events in each frame into columns for a single record.  We then perform wavelet transformation to convert the time-domain into time-frequency domain.  These are done with the help of custom CUDA kernels written with `numba`.


In [None]:
# This automatically time every cell's execution
!pip install ipython-autotime
%load_ext autotime

In [None]:
import numpy as np
import pandas as pd

import pygdf
from numba import cuda

In [None]:
# import holoviews as hv
# import bokeh.palettes

# hv.extension('bokeh')

In [None]:
import logging
logging.disable(logging.WARNING)

## Load data from CSV

In [None]:
nrows = 10000000

In [None]:
def read_data(path):
    print('reading', path)
    # Read using pandas 
    df = pd.read_csv(path, nrows=nrows, index_col='Index')
    print('shape =', df.shape)
    return df

In [None]:
# import os

# if not os.path.exists('data/har/Activity recognition exp'):
#     # Unzip data if not already there
#     !unzip "data/har/Activity recognition exp.zip" -d data/har
#     !ls data/har

In [None]:
!wc -l "Phones_accelerometer.csv"

In [None]:
pa_df = read_data('Phones_accelerometer.csv')
pg_df = read_data('Phones_gyroscope.csv')

Preprocess categorical columns

In [None]:
from collections import defaultdict

cats = defaultdict(set)
for df in [pa_df, pg_df]:
    for col in ['Model', 'User', 'Device', 'gt']:
        df[col] = df[col].astype('category')
        cats[col] |= set(df[col].astype('category').cat.categories)
        #cats[col].add('null')

for col in ['Model', 'User', 'Device', 'gt']:
    ordered_cats = sorted(cats[col])
    print(col, ordered_cats)
    for df in [pa_df, pg_df]:
        df[col].cat.set_categories(ordered_cats)
        #df[col] = df[col].fillna('null')
        df[col] = df[col].cat.codes

In [None]:
# activities = tuple(df['gt'].cat.categories)
# activities

### Convert to GPU DataFrame

In [None]:
pa_df = pygdf.DataFrame.from_pandas(pa_df)
pg_df = pygdf.DataFrame.from_pandas(pg_df)

In [None]:
pa_df.dtypes

In [None]:
pa_df.head().to_pandas()

In [None]:
pg_df.head().to_pandas()

## More preprocessing on the GPU

### Exclude `gt == null`

In [None]:
#null_idx = pa_df['gt'].cat.categories.index('null')

In [None]:
print('before', len(pa_df), len(pg_df))
#pa_df = pa_df.query('gt != @null_idx').reset_index()
#pg_df = pg_df.query('gt != @null_idx').reset_index()
pa_df = pa_df.query('gt != -1').reset_index()
pg_df = pg_df.query('gt != -1').reset_index()
print('after', len(pa_df), len(pg_df))

### Scale x, y, z

In [None]:
def rescale(sr):
    maxval = max(abs(sr.max()), abs(sr.min()))
    return sr / maxval

for df in [pa_df, pg_df]:
    for col in 'xyz':
        df[col] = rescale(df[col])

### Bin the time series

In [None]:
dt = 1280000000
subsample_size = 2**5
print('subsample_size', subsample_size)
dt2 = dt / subsample_size
pa_df['resampled'] = (pa_df['Creation_Time'] // dt)
pg_df['resampled'] = (pg_df['Creation_Time'] // dt)
pa_df['resampled_inner'] = (pa_df['Creation_Time'] // dt2)
pg_df['resampled_inner'] = (pg_df['Creation_Time'] // dt2)

### Use Numba to JIT compile GPU kernels for transposing rows into columns

In [None]:
from numba import cuda, float64


@cuda.jit
def is_valid(offsets, valid):
    idx = cuda.grid(1)
    if idx < valid.size:
        s = offsets[idx]
        e = offsets[idx + 1]
        valid[idx] = e - s == subsample_size

@cuda.jit
def expand_df(offsets, inp, out):
    blkid = cuda.blockIdx.x
    tid = cuda.threadIdx.x
    row_start = offsets[blkid]
    row_stop = offsets[blkid + 1]
    if tid < out.shape[0]:
        val = np.nan
        if row_stop - row_start == out.shape[0]:
            val = inp[row_start + tid]

        out[tid, blkid] = val

### JIT custom GPU kernels for simple wavelet decomposition

In [None]:
@cuda.jit(device=True)
def inner_haar_wavelet(arr):
    # assume pow-of-2 subsample_size
    stride = subsample_size // 2
    while stride:
        i = cuda.threadIdx.x
        if i < stride:
            even = arr[2 * i]
            odd = arr[2 * i + 1]
            c0 = (even + odd) / 2
            c1 = (even - odd) / 2
        cuda.syncthreads()
        if i < stride:
            arr[i] = c0
            arr[i + stride] = c1
        cuda.syncthreads()
        stride //= 2


@cuda.jit
def haar_wavelet(arr):
    sm_arr = cuda.shared.array((subsample_size,), dtype=float64)
    blkid = cuda.blockIdx.x
    i = cuda.threadIdx.x
    sm_arr[i] = arr[i, blkid]
    cuda.syncthreads()

    inner_haar_wavelet(sm_arr)

    arr[i, blkid] = sm_arr[i]

            

### Use GPU Groupby to resample and bin the time series data

In [None]:
def expand(src_df):
    # First groupby creates the frames. Each frame has 20 rows.
    groupkeys = ['resampled', 'User', 'Model', 'Device', 'gt', 'resampled_inner']
    df = src_df.loc[:, groupkeys + ['x', 'y', 'z']].groupby(groupkeys).mean()
    # Second groupby transpose the rows in each frame into columns
    grouped, segs = df.groupby(groupkeys[:-1]).as_df()
    numgroups = len(segs)
    d_offsets = cuda.to_device(np.hstack([segs.to_array(), len(grouped)]))

    d_valid = cuda.device_array(numgroups, dtype=np.bool_)
    d_xs = cuda.device_array((subsample_size, numgroups), dtype=np.float64)
    d_ys = cuda.device_array((subsample_size, numgroups), dtype=np.float64)
    d_zs = cuda.device_array((subsample_size, numgroups), dtype=np.float64)
    
    # Launch transposing CUDA kernel
    # Each row of x, y, z becomes columns
    expand_df[numgroups, subsample_size](d_offsets, grouped['x'].to_gpu_array(), d_xs)
    expand_df[numgroups, subsample_size](d_offsets, grouped['y'].to_gpu_array(), d_ys)
    expand_df[numgroups, subsample_size](d_offsets, grouped['z'].to_gpu_array(), d_zs)
    is_valid.forall(d_offsets.size - 1)(d_offsets, d_valid)
    
    # Use wavelet to decompose the time-domain data into time-frequency data
    haar_wavelet[numgroups, subsample_size](d_xs)
    haar_wavelet[numgroups, subsample_size](d_ys)
    haar_wavelet[numgroups, subsample_size](d_zs)
    
    # Creates the final resampled dataframe.
    # Now, each row is a frame.
    outdf = pygdf.DataFrame()
    outdf['resampled'] = grouped['resampled'].take(d_offsets[:-1], ignore_index=True)
    outdf['User'] = grouped['User'].take(d_offsets[:-1], ignore_index=True)
    outdf['Device'] = grouped['Device'].take(d_offsets[:-1], ignore_index=True)
    outdf['gt'] = grouped['gt'].take(d_offsets[:-1], ignore_index=True)
    outdf['valid'] = d_valid
    for colname, colvals in zip('xyz', [d_xs, d_ys, d_zs]):
        for i in range(subsample_size):
            fullcolname = '{}{}'.format(colname, i)
            sr = pygdf.Series(colvals[i])
            outdf[fullcolname] = rescale(sr)
    
    out = outdf.query("valid")
    out.drop_column("valid")
    return out
    

Resample the accelerometer data

In [None]:
newpadf = expand(pa_df)
print(len(newpadf))
newpadf.head().to_pandas()

Resample the gyroscope data

In [None]:
newpgdf = expand(pg_df)
print(len(newpgdf))
newpgdf.head().to_pandas()

### Use Inner-Join to combine the accelerometer and gyroscope data on the GPU

In [None]:
joined = newpadf.set_index('resampled').join(newpgdf.set_index('resampled'), how='inner',
                                             lsuffix='_a', rsuffix='_g')
joined.head().to_pandas()

### Filter the joined table.  Drop mismatching rows.

In [None]:
filtered = joined.query("User_a == User_g and gt_a == gt_g and Device_a == Device_g")
print(len(filtered))
filtered.head().to_pandas()

### Create features table for KMeans

In [None]:
feature_columns = ['gt_a'] + [k for k in filtered.columns if k[0] in 'xyz']
features_df = filtered.loc[:, feature_columns]
print(len(features_df))
features_df.head().to_pandas()

### Randomize data and split into training and testing

In [None]:
def split_train_test(df):
    idx = np.arange(len(df))
    np.random.shuffle(idx)
    d_idx = cuda.to_device(idx)
    outdf = pygdf.DataFrame()
    for k in df.columns:
        outdf[k] = df[k].take(d_idx, ignore_index=True) 
        
    split_pt = int(len(outdf) * 0.8)
    return outdf[:split_pt], outdf[split_pt:]
    
train_df, test_df = split_train_test(features_df)
print('train size', len(train_df))
print('test size', len(test_df))

train_df.head().to_pandas()

### Create feature matrix

In [None]:
feature_columns = [k for k in train_df.columns if k !='gt_a']
feature_matrix = train_df.as_matrix(feature_columns)

feature_matrix.shape

## GPU Machine Learning with H2O

### Use `h2o4gpu.KMeans` to cluster the data

In [None]:
import h2o4gpu

kmeans = h2o4gpu.KMeans(n_clusters=80, n_gpus=1, max_iter=1000)
kmeans.fit(feature_matrix)
predicted = kmeans.predict(feature_matrix)

### Find meaning for each cluster

For each cluster, the dominating class is adopted as the class for the cluster.

In [None]:
pred_df = pygdf.DataFrame()

gt_predicted = pygdf.Series(np.asarray(predicted, dtype=np.int32))
pred_df['gt_predicted'] = gt_predicted.set_index(train_df.index)

for k in features_df.columns:
    pred_df[k] = train_df[k]

pred_df.head().to_pandas()

One-hot encode the "activities" (`gt_a` column).  The will serve as "vote" to determine the class for each cluster.

In [None]:
out_df = pred_df.copy()
for name, col in zip(activities, 
                     pred_df['gt_a'].astype(np.int32).one_hot_encoding(cats=list(range(len(activities))))):
    out_df[name] = col.set_index(pred_df.index)
    
out_df.head().to_pandas()

Count the "vote" for each cluster using groupby-mean

In [None]:
cols = [k for k in out_df.columns if k.startswith('gt') and k !='gt_a' or k in activities]
finaldf = out_df.loc[:, cols].groupby('gt_predicted').mean()
finaldf.head().to_pandas()

Associate each cluster with the dominating activity.

In [None]:
from collections import defaultdict

predict_dict = {}
for row in range(len(finaldf)):
    r = finaldf.gt_predicted[row]
    best = max((finaldf[k][row], k) for k in activities)
    print('cluster {}: {:.1f}% {}'.format(r,
                                          best[0] * 100, 
                                          best[1]))
    predict_dict[r] = "{}.{}".format(best[1], r)
    
predict_cats = [predict_dict.get(i, 'unknown.{}'.format(i)) for i in range(len(kmeans.cluster_centers_))]

### Evaluate the model with the testing dataset

In [None]:
test_matrix = test_df.as_matrix(feature_columns)
test_matrix.shape

In [None]:
predict_test = kmeans.predict(test_matrix)

In [None]:
eval_df = pygdf.DataFrame()

eval_df['gt_predicted'] = pd.Categorical.from_codes(predict_test, categories=predict_cats)
eval_df['gt_a'] = test_df['gt_a'].reset_index()

eval_df.head(n=30).to_pandas()

Compute prediction accuracy

In [None]:
matches = eval_df.to_pandas().apply(lambda sr: sr['gt_predicted'].split('.')[0] == sr['gt_a'], axis=1)
num_correct = len(matches[matches == True])
num_total = len(matches)
print('{:.1f}% correctly predicted'.format(100 * num_correct / num_total))

### Visualize the clusters

Use LDA to 2D-project our training data

In [None]:
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA

lda = LDA(n_components=2)
lda_feat_mat = lda.fit(feature_matrix, train_df['gt_a'].to_array()).transform(feature_matrix)

In [None]:
cluster_df = pygdf.DataFrame()
cluster_df['x'] = np.require(lda_feat_mat[:, 0], requirements='C')
cluster_df['y'] = np.require(lda_feat_mat[:, 1], requirements='C')
cluster_df['activity'] = pred_df['gt_a']

predicted_clusters = pd.Categorical.from_codes(pred_df['gt_predicted'].to_pandas(), categories=predict_cats)
cluster_df['prediction'] = pd.Categorical([k.split('.')[0] for k in predicted_clusters])

cluster_df.head().to_pandas()

In [None]:
%%opts Points [width=480, height=400]

plot_df = cluster_df.to_pandas().sample(min(2000, len(cluster_df)))

cmap = bokeh.palettes.Set1[8]

pts_expected = {k : hv.Points(plot_df.loc[plot_df['activity'] == k])(style={'color': col}) 
                for k, col in zip(activities, cmap)}

pts_predicted = {k : hv.Points(plot_df.loc[plot_df['prediction'] == k])(style={'color': col}) 
                for k, col in zip(activities, cmap)}

left = hv.NdOverlay(pts_expected, kdims=['activity'])
right = hv.NdOverlay(pts_predicted, kdims=['activity'])

left.relabel('Expected') + right.relabel('Predicted')

### Search for best k in k-means

In [None]:
def eval_cluster_size(n_clusters, test_df):
    # Clustering
    kmeans = h2o4gpu.KMeans(n_clusters=n_clusters, n_gpus=1, max_iter=1000)
    kmeans.fit(feature_matrix)
    predicted = kmeans.predict(feature_matrix)

    # Determine cluster meaning
    pred_df = pygdf.DataFrame()

    gt_predicted = pygdf.Series(np.asarray(predicted, dtype=np.int32))
    pred_df['gt_predicted'] = gt_predicted.set_index(train_df.index)

    for k in features_df.columns:
        pred_df[k] = train_df[k]

    out_df = pred_df.copy()
    for name, col in zip(activities, 
                         pred_df['gt_a'].astype(np.int32).one_hot_encoding(cats=list(range(len(activities))))):
        out_df[name] = col.set_index(pred_df.index)


    cols = [k for k in out_df.columns if k.startswith('gt') and k !='gt_a' or k in activities]
    finaldf = out_df.loc[:, cols].groupby('gt_predicted').mean()


    predict_dict = {}
    for row in range(len(finaldf)):
        r = finaldf.gt_predicted[row]
        best = max((finaldf[k][row], k) for k in activities)
        predict_dict[r] = "{}.{}".format(best[1], r)

    predict_cats = [predict_dict.get(i, 'unknown.{}'.format(i)) for i in range(len(kmeans.cluster_centers_))]

    # Evaluate
    test_matrix = test_df.as_matrix(feature_columns)

    predict_test = kmeans.predict(test_matrix)

    eval_df = pygdf.DataFrame()

    eval_df['gt_predicted'] = pd.Categorical.from_codes(predict_test, categories=predict_cats)
    eval_df['gt_a'] = test_df['gt_a'].reset_index()

    eval_df.head(n=30).to_pandas()

    matches = eval_df.to_pandas().apply(lambda sr: sr['gt_predicted'].split('.')[0] == sr['gt_a'], axis=1)
    num_correct = len(matches[matches == True])
    num_total = len(matches)
    correctness = num_correct / num_total
    print('n_clusters={} | {:.1f}% correctly predicted'.format(n_clusters, 100 * correctness))
    return correctness
    

print("Scan testing data")
cc_test = [(k, eval_cluster_size(k, test_df=test_df)) for k in range(10, 500, 50)]
print("Scan training data")
cc_train = [(k, eval_cluster_size(k, test_df=train_df)) for k in range(10, 500, 50)]

Plot accuracy vs num of clusters

In [None]:
%%opts Curve [width=500, height=400, show_grid=True]


def cc_make_df(cc):
    xs, ys = zip(*cc)
    df = pd.DataFrame()
    df['num_cluster'] = xs
    df['accuracy %'] = np.asarray(ys) * 100
    return df


df_test = cc_make_df(cc_test)
df_train = cc_make_df(cc_train)

title = 'Accuracy vs Number of Clusters'
(hv.Scatter(df_test) * hv.Curve(df_test, label='test') * hv.Scatter(df_train) * hv.Curve(df_train, label='train')).relabel(title)