# End to End GPU Processing With PyMapD, PyGDF, and H2O4GPU

In [1]:
import os
from pymapd import connect
import pygdf
import pyarrow
import pandas as pd
import numpy as np
import h2o4gpu
from h2o4gpu import GradientBoostingRegressor

In [2]:
con = connect(user="mapd", password=os.environ["mapd_password"],
                  host="ec2-12-345-678-910.compute-1.amazonaws.com",dbname="mapd")

In [3]:
con.get_tables()

['taxi_weather_tracts_factual',
 'nyc_trees_2015_683k',
 'flights_2008_7M',
 'trips']

In [4]:
trips_query = "SELECT total_amount, passenger_count, precipitation, trip_distance \
                FROM trips \
                LIMIT 1000000"

In [5]:
c = con.cursor()

In [6]:
c.execute(trips_query)

<pymapd.cursor.Cursor at 0x7fb47eef3978>

In [7]:
trips = pygdf.DataFrame.from_pandas(pd.DataFrame(c.fetchall())\
                                    .rename(columns={0:"total_amount", 1:"passenger_count",
                                                     2:"precipitation", 3:"trip_distance"}))

In [8]:
trips.to_pandas().head()

Unnamed: 0,total_amount,passenger_count,precipitation,trip_distance
0,7.0,1,0,0.87
1,4.8,2,0,0.3
2,18.12,1,0,3.04
3,8.0,1,0,1.1
4,27.0,2,0,6.2


## Split the data into training, validation, and test sets 

In [9]:
# enforce float64 data type on ALL columns
for k in trips.columns:
    trips[k] = trips[k].astype(np.float64)

# set the fractions for training and validation
fractions = {
    "train": 0.8,
    "valid": 0.2
}

# validation splitpoint
splitpoint = int(len(trips) * fractions["train"])
print('splitpoint: {} of {} is {}'.format(fractions["train"], len(trips), splitpoint))

# break the gdf up into training, validation, and test sets
gdfs = {
    "train": trips.loc[:splitpoint],
    "valid": trips.loc[splitpoint:]
}
print('gdfs["train"] has {} rows'.format(len(gdfs["train"])))
print('gdfs["valid"] has {} rows'.format(len(gdfs["valid"])))


splitpoint: 0.8 of 1000000 is 800000
gdfs["train"] has 800001 rows
gdfs["valid"] has 200000 rows


## Convert GDFs to GPU Matrices 

In [10]:
# produce gpu matrices (to input to ml libraries, etc)# produc 
# this step should not be necessary in the near future
# (should be able to use gdf as input)
matrices = {
    "train": {
        "x": gdfs["train"].as_gpu_matrix(columns=trips.columns[1:]),
        "y": gdfs["train"].as_gpu_matrix(columns=[trips.columns[0]])
    },
    "valid": {
        "x": gdfs["valid"].as_gpu_matrix(columns=trips.columns[1:]),
        "y": gdfs["valid"].as_gpu_matrix(columns=[trips.columns[0]])
    }
}

# check the matrix shapes (sanity check)
print('matrices["train"]["x"] shape:', matrices["train"]["x"].shape)
print('matrices["train"]["y"] shape:', matrices["train"]["y"].shape)
print('matrices["valid"]["x"] shape:', matrices["valid"]["x"].shape)
print('matrices["valid"]["y"] shape:', matrices["valid"]["y"].shape)

matrices["train"]["x"] shape: (800001, 3)
matrices["train"]["y"] shape: (800001, 1)
matrices["valid"]["x"] shape: (200000, 3)
matrices["valid"]["y"] shape: (200000, 1)


## Obtain pointers to the matrices 

In [11]:
# get pointers (so we can keep data on gpu)
# this step should not be necessary in the near future
# (should be able to use gdf as input)

from ctypes import *

def get_pointer(matrix):
    return c_void_p(matrix.device_ctypes_pointer.value)

# Train XGBoost Model With H2O4GPU

In [12]:
# ensure that we use the h2o4gpu backend (not sklearn)
xgb = GradientBoostingRegressor(backend = "h2o4gpu")

# convert input data from gdf to cpu matrices (numpy ndarrays)
cpu_matrices = {
    "train": {
        "x": gdfs["train"].as_matrix(columns=trips.columns[1:]),
        "y": gdfs["train"].as_matrix(columns=[trips.columns[0]]).flatten()
    },
    "test": {
        "x": gdfs["valid"].as_matrix(columns=trips.columns[1:]),
        "y": gdfs["valid"].as_matrix(columns=[trips.columns[0]]).flatten()
    }
}

# set the base parameters
num_rounds = 10
xgb_params = {
    "learning_rate": 0.1,
    "n_estimators": 100,
    "subsample": 1.0,
    "n_gpus": 1
}
xgb.set_params(**xgb_params)

XGBRegressor(backend='h2o4gpu', base_score=0.5, booster='gbtree',
       colsample_bylevel=1, colsample_bytree=1.0, gamma=0,
       learning_rate=0.1, max_delta_step=0, max_depth=3,
       min_child_weight=1, missing=None, n_estimators=100, n_gpus=-1,
       n_jobs=1, nthread=None, num_parallel_tree=1, objective='reg:linear',
       predictor='gpu_predictor', random_state=0, reg_alpha=0,
       reg_lambda=1, scale_pos_weight=1, seed=None, silent=True,
       subsample=1.0, tree_method='gpu_hist')

In [13]:
%%time
# fit the model
xgb.fit(X = cpu_matrices["train"]["x"], y = cpu_matrices["train"]["y"])

CPU times: user 1.05 s, sys: 269 ms, total: 1.32 s
Wall time: 1.32 s


XGBRegressor(backend='h2o4gpu', base_score=0.5, booster='gbtree',
       colsample_bylevel=1, colsample_bytree=1.0, gamma=0,
       learning_rate=0.1, max_delta_step=0, max_depth=3,
       min_child_weight=1, missing=None, n_estimators=100, n_gpus=-1,
       n_jobs=1, nthread=None, num_parallel_tree=1, objective='reg:linear',
       predictor='gpu_predictor', random_state=0, reg_alpha=0,
       reg_lambda=1, scale_pos_weight=1, seed=None, silent=True,
       subsample=1.0, tree_method='gpu_hist')

In [14]:
# predict based upon test values
xgb_predictions = xgb.model.predict(cpu_matrices["test"]["x"])

# show the first 20 results
print(xgb_predictions[0:20])

[ 9.730211   9.34366   12.54071    9.06083   14.327643  11.88019
  5.5042825 11.862956   6.4842734 11.178525   7.1239367  7.1239367
  9.06083    5.5042825  8.707001  14.658997  15.037864  13.892837
  8.78333    9.988817 ]
