# GPU-Enabled Deep Learning with Greenplum using Tensorflow and Keras

This iPython notebook walks the user through training and testing a deep learning model built with tensorflow and keras in Greenplum 5.X. The model is constructed within a plpython function in SQL, and this function is invoked through a SQL UDF (with the training/test data passed in).

Connect to the host GCP machine, which is GPU-enabled, and interface with this machine using psql

In [None]:
%load_ext sql

In [None]:
# Greenplum Database 5.4.0 on GCP with NVIDIA Tesla K80 GPU (demo machine)
greenplum_gpu_db = "postgresql://<username>@<ip>:<postgres_port>/<dbname>"
%sql $greenplum_gpu_db

Load MNIST dataset into SQL

In [None]:
%%bash -s $greenplum_gpu_db

# Unzip data files
gunzip -c ../data/mnist_train.sql.gz > ../data/mnist_train.sql
gunzip -c ../data/mnist_test.sql.gz > ../data/mnist_test.sql

# Greenplum load
psql $1 -f ../data/mnist_train.sql
psql $1 -f ../data/mnist_test.sql

In [None]:
%%sql
SELECT * FROM mnist_train LIMIT 2;

Define a SQL aggregate function that inserts the MNIST dataset (stored within a SQL table) into the Python general dictionary (GD), which is accessible within plpython functions. This allows the model training/testing algorithms to access the training and test MNIST data. The GD is initially empty.

In [None]:
%%sql
drop function if exists add_data_to_GD(
    text[],
    text,
    text[],
    float8[],
    float8
) cascade;
create or replace function add_data_to_GD(
    key text[],  -- 
    table_name text, -- table name
    col_names text[], -- name of the features column and the dependent variable column
    features float8[], -- independent variables (as array)
    label float8 -- dependent variable column
)
returns text[]
as
$$
    col_names_key = table_name+'_col_names'
    if col_names_key not in GD:
        GD[col_names_key] = col_names
        
    if not key:
        gd_feature_key = table_name + '_' + col_names[0]
        GD[gd_feature_key] = [features]
        gd_label_key = table_name + '_' + col_names[1]
        GD[gd_label_key] = [label]
        return [gd_feature_key, gd_label_key]
    else:
        GD[table_name + '_' + col_names[0]].append(features)
        GD[table_name + '_' + col_names[1]].append(label)
        return key
$$language plpythonu;

drop aggregate if exists add_data_to_GD_agg( 
    text, -- table name
    text[], -- header (feature names)
    float8[], -- features (feature values),
    float8 -- labels
) cascade;
create aggregate add_data_to_GD_agg(
        text, -- table name
        text[], -- header (feature names)
        float8[], -- features (feature values),
        float8 -- labels
    )
(
    SFUNC = add_data_to_GD,
    STYPE = text[]
);

In [None]:
%%sql
create or replace function viewGD()
returns text
as
$$
   return GD
$$language plpythonu;

In [None]:
%%sql
create or replace function clearGD()
returns void
as
$$
   GD['iris_data_col_names'] = []
   GD['iris_data_x'] = []
   GD['iris_data_y'] = []
$$language plpythonu;

In [None]:
%%sql
select viewGD();

In [None]:
%%sql
create or replace function usingGPU()
returns text
as
$$
   import tensorflow as tf 
   sess = tf.Session(config=tf.ConfigProto(log_device_placement=True))
   return sess
$$language plpythonu;

In [None]:
%%sql 
SELECT usingGPU();

Define the plpython function keras_train, callable in SQL, to train a keras deep learning model with the provided MNIST training set. The model created is a simple dense neural network with 1 hidden layer, a softmax output, and dropout regularization. The function returns the trained model in serialized format, outputing a text array that contains serialized strings of the model's architecture and weights.

In [None]:
%%sql
create or replace function keras_train(train_keys text[],  test_keys text[], use_cpu_only boolean)
returns text[]
as
$$
    import os
    import tensorflow as tf
    plpy.info("Start")
    plpy.info(os.popen('hostname').read().strip())
    
    import keras
    from keras.models import Sequential
    from keras.layers import Dense, Dropout, Activation
    from keras.optimizers import RMSprop
    import pandas as pd
    import numpy as np
    import pickle
    
    batch_size = 128
    num_classes = 10
    epochs = 5
    feature_key = train_keys[0]
    label_key = train_keys[1]
    x_train = np.asarray(GD[feature_key], dtype=np.float32)
    x_train /= 255
    
    y_train = np.asarray(GD[label_key], dtype=np.float32)
    y_train = keras.utils.to_categorical(y_train, num_classes)

    test_feature_key = test_keys[0]
    test_label_key = test_keys[1]

    x_test = np.asarray(GD[test_feature_key], dtype=np.float32)
    x_test /= 255
    
    y_test = np.asarray(GD[test_label_key], dtype=np.float32)
    y_test = keras.utils.to_categorical(y_test)
    
    model = Sequential()
    model.add(Dense(512, activation='relu', input_shape=(784,)))
    model.add(Dropout(0.2))
    model.add(Dense(512, activation='relu'))
    model.add(Dropout(0.2))
    model.add(Dense(num_classes, activation='softmax'))

    model.compile(loss='categorical_crossentropy',
              optimizer=RMSprop(),
              metrics=['accuracy'])

    if use_cpu_only: 
        import os
        os.environ['CUDA_VISIBLE_DEVICES'] = '-1'  # to disable GPU
        device_flag = '/cpu:0'
    else: 
        device_flag = '/gpu:0'
        
    with tf.device(device_flag):
        
        # Debugging if it actually works only on cpu 
        # Below plpy.info should only include "/device:CPU:0" if USE_CPU_ONLY=True
        from tensorflow.python.client import device_lib
        plpy.info(device_lib.list_local_devices())
        
        history = model.fit(x_train, y_train,
                            batch_size=batch_size,
                            epochs=epochs,
                            verbose=1
                           )

        model_arch_json = model.to_json()
        model_serialized_weights = pickle.dumps(model.get_weights(), protocol=0)
        result = [model_arch_json, model_serialized_weights]

    plpy.info("Finished training")
    return result
    
$$language plpythonu IMMUTABLE;

Call this plpython function in SQL, inserting the training and test sets into the GD in order to pass in the data. Store the output (the serialized model) into a SQL table, to be used during the prediction phase of this module. Note: if tensorflow is mapped to tensorflow-gpu and CUDA/cuDNN 9.0 are installed, the system automatically uses the available NVIDIA Tesla K80 GPU for training.

In [None]:
%%sql

-- Use GPU
drop table if exists results_gpu_train, test_table;
create table results_gpu_train (
    model_architecture text,
    model_weights text
);
create table test_table as (
    select keras_train(stacked_train_keys, stacked_test_keys, false)
        from
            (select add_data_to_GD_agg('mnist_train', ARRAY['x', 'y'], x, y) 
                    as stacked_train_keys
            from
                mnist_train
            )q1,
            (select add_data_to_GD_agg('mnist_test', ARRAY['x', 'y'], x, y)
                    as stacked_test_keys
            from 
                mnist_test
            )q2);
insert into results_gpu_train values (
    (select keras_train[1] FROM test_table),
    (select keras_train[2] FROM test_table));
SELECT model_architecture FROM results_gpu_train;

In [None]:
%%sql

-- Use CPU
drop table if exists results_cpu_train, test_table;
create table results_cpu_train (
    model_architecture text,
    model_weights text
);
create table test_table as (
    select keras_train(stacked_train_keys, stacked_test_keys, true)
        from
            (select add_data_to_GD_agg('mnist_train', ARRAY['x', 'y'], x, y) 
                    as stacked_train_keys
            from
                mnist_train
            )q1,
            (select add_data_to_GD_agg('mnist_test', ARRAY['x', 'y'], x, y)
                    as stacked_test_keys
            from 
                mnist_test
            )q2);
insert into results_cpu_train values (
    (select keras_train[1] FROM test_table),
    (select keras_train[2] FROM test_table));
SELECT model_architecture FROM results_cpu_train;

In [None]:
import tensorflow as tf
sess = tf.Session(config=tf.ConfigProto(log_device_placement=True))
print(tf.device('/gpu:0'))

Define the plpython function keras_predict, callable in SQL, to predict the labels of the test set data using the trained keras model (computed and returned in the previous cell). The function first de-serializes and recreates the  keras model. Then, it uses the MNIST test set to perform prediction. The function returns the loss and accuracy of its predictions, as specified in keras_predict_type.

In [None]:
%%sql
DROP TYPE IF EXISTS keras_predict_type CASCADE;
CREATE TYPE keras_predict_type
AS
(
    hostname text, -- hostname on which the model was built
    loss text, -- intercepts
    accuracy text -- training data fit
);

In [None]:
%%sql
create or replace function keras_predict(model_data text, test_keys text[])
returns keras_predict_type
as
$$
    import os

    plpy.info("Start")
    plpy.info(os.popen('hostname').read().strip())
    
    import keras
    from keras.models import model_from_json
    from keras.models import Sequential
    from keras.layers import Dense, Dropout, Activation
    from keras.optimizers import RMSprop, Adam
    import pandas as pd
    import numpy as np
    import pickle
    
    batch_size = 128
    num_classes = 10
    
    df1 = pd.DataFrame(GD[model_data], columns=GD['results_train'+'_header'])
    model_architecture = df1[GD['results_train'+'_header'][0]].values.tolist()
    model_weights = df1[GD['results_train'+'_header'][1]].values.tolist()
    
    test_feature_key = test_keys[0]
    test_label_key = test_keys[1]
    x_test=np.asarray(GD[test_feature_key], dtype=np.float32)
    x_test /= 255
    y_test = np.asarray(GD[test_label_key], dtype=np.float32)
    y_test = keras.utils.to_categorical(y_test)
    
    model = model_from_json(model_architecture[0])
    model_weights = pickle.loads(model_weights[0])
    model.set_weights(model_weights)
    
    model.compile(loss='categorical_crossentropy',
              optimizer=Adam(),
              metrics=['accuracy'])
    
    score = model.evaluate(x_test, y_test, verbose=0)
    result = [os.popen('hostname').read().strip(),
             "Test loss: {0}".format(score[0]),
             "Test accuracy: {0}".format(score[1])]
    return result
    
$$language plpythonu IMMUTABLE;

This user-defined aggregate serves the same purpose as the previous add_data_to_GD UDA, but specifically accepts inputs of type text rather than float8.

In [None]:
%%sql
drop function if exists add_model_to_GD(
    text,
    text,
    text[],
    text,
    text
) cascade;
create or replace function add_model_to_GD(
    key text,
    table_name text, -- table name
    header text[], -- name of the features column and the dependent variable column
    features text, -- independent variables (as array)
    label text -- dependent variable column
)
returns text
as
$$
    header_key = table_name+'_header'
    if header_key not in GD:
        GD[header_key] = header
    if not key:
        gd_key = table_name
        GD[gd_key] = [[features, label]]
        return gd_key
    else:
        GD[key].append([features, label])
        return key
$$language plpythonu;

drop aggregate if exists add_model_to_GD_agg(
    text, -- table name
    text[], -- header (feature names)
    text, -- features (feature values),
    text -- labels
) cascade;
create aggregate add_model_to_GD_agg(
        text, -- table name
        text[], -- header (feature names)
        text, -- features (feature values),
        text -- labels
    )
(
    SFUNC = add_model_to_GD,
    STYPE = text -- the key in GD used to hold the data across calls
);

Call the previously-defined keras_predict function in SQL, passing in the serialized model and test data via GD. Outputs the loss and accuracy data into a SQL table. Uses the NVIDIA Tesla K80 GPU for prediction if tensorflow_gpu is configured.

In [None]:
%%sql
select keras_predict(model_data, stacked_test_keys) as results_test
from
    (select add_model_to_GD_agg('results_train', ARRAY['model_architecture', 'model_weights'], model_architecture, model_weights)
        as model_data
        from
            results_train
        )q1,
    (select add_data_to_GD_agg('mnist_test', ARRAY['x', 'y'], x, y)
        as stacked_test_keys
        from 
            mnist_test
        )q2;

In [None]:
print('-------------------------------------------------------------------------------------------------------------------')
print('End of deep learning GPU spike')
print('-------------------------------------------------------------------------------------------------------------------')