In [1]:
import matplotlib.pyplot as plt

import threading

import os
import time

import pandas as pd
import numpy as np

import tensorflow as tf
from tensorflow.python.client import device_lib

from sklearn.datasets import make_classification

def get_available_gpus():
    local_device_protos = device_lib.list_local_devices()
    return [x.name for x in local_device_protos if x.device_type == 'GPU']

def make_data(filepath, n_obs, n_dim, seed, K):

    try:
        os.remove(filepath)
    except:
        print('file not found')
    finally:
        (X, Y) = make_classification(n_samples            = n_obs    , 
                                     n_features           = n_dim    ,
                                     n_informative        = n_dim    ,
                                     n_redundant          = 0        ,
                                     n_classes            = K        ,
                                     n_clusters_per_class = 1        ,
                                     shuffle              = True     ,
                                     class_sep            = 1.5      ,
                                     random_state         = seed      )
        
        np.savez(filepath, X=X, Y=Y)

    return True

  return f(*args, **kwds)


In [2]:
n_obs = 200000000
n_dim = 2
K     = 3
GPU_names = get_available_gpus()
n_max_iters = 20
seed = 800594

In [None]:
make_data('test-data.npz', n_obs, n_dim, seed, K)

In [3]:
with np.load('test-data.npz') as data:
    data_X = data['X']
    data_Y = data['Y']

    
maxsize = 2 * 1024 * 1024 * 1024
size_of_each = data_X.shape[1] * data_X.dtype.itemsize

initial_centers = data_X[0:K, :]

In [None]:
class GpuParamServerDeviceSetter(object):
    """Used with tf.device() to place variables on the least loaded GPU.
    
    A common use for this class is to pass a list of GPU devices, e.g. ['gpu:0',
    'gpu:1','gpu:2'], as ps_devices.  When each variable is placed, it will be
    placed on the least loaded gpu. All other Ops, which will be the computation
    Ops, will be placed on the worker_device.
    """

    def __init__(self, worker_device, ps_devices):
        """Initializer for GpuParamServerDeviceSetter
        Args:
        worker_device: the device to use for computation Ops.
        ps_devices: a list of devices to use for Variable Ops. Each variable is
        assigned to the least loaded device.
        """
        self.ps_devices = ps_devices
        self.worker_device = worker_device
        self.ps_sizes = [0] * len(self.ps_devices)

    def __call__(self, op):
        if op.device:
            return op.device
        if op.type not in ['Variable', 'VariableV2', 'VarHandleOp']:
            return self.worker_device

        # Gets the least loaded ps_device
        device_index, _ = min(enumerate(self.ps_sizes), key=operator.itemgetter(1))
        device_name = self.ps_devices[device_index]
        var_size = op.outputs[0].get_shape().num_elements()
        self.ps_sizes[device_index] += var_size
        
        return device_name

def _create_device_setter(is_cpu_ps, worker, num_gpus):
    """Create device setter object."""
    if is_cpu_ps:
        # tf.train.replica_device_setter supports placing variables on the CPU, all
        # on one GPU, or on ps_servers defined in a cluster_spec.
        return tf.train.replica_device_setter(
            worker_device=worker, ps_device='/cpu:0', ps_tasks=1)
    else:
        gpus = ['/gpu:%d' % i for i in range(num_gpus)]
        return GpuParamServerDeviceSetter(worker, gpus)
        

In [None]:
def make_batch(dataset, batch_size):
    dataset = dataset.batch(batch_size)
    iterator = dataset.make_one_shot_iterator()
    data_batch = iterator.get_next()
    
def tower_fn(data_batch, K, initial_centers, max_iters):
    ####
    # In the coments we denote :
    # => N = Number of Observations
    # => M = Number of Dimensions
    # => K = Number of Centers
    ####
    
    # Reshapes rep_centroids and rep_points to format N x K x M so that 
    # the 2 matrixes have the same size
    rep_centroids = tf.reshape(tf.tile(global_centroids, [N, 1]), [N, K, M])
    rep_points = tf.reshape(tf.tile(X, [1, K]), [N, K, M])

    # Calculates sum_squares, a matrix of size N x K
    # This matrix is not sqrt((X-Y)^2), it is just(X-Y)^2
    # Since we need just the argmin(sqrt((X-Y)^2)) wich is equal to 
    # argmin((X-Y)^2), it would be a waste of computation
    sum_squares = tf.reduce_sum(tf.square(tf.subtract( rep_points, rep_centroids) ), axis = 2)

    # Use argmin to select the lowest-distance point
    # This gets a matrix of size N x 1
    best_centroids = tf.argmin(sum_squares, axis = 1)
                    
    means = []
    for c in range(K):
        means.append(
            tf.reduce_mean(
                tf.gather(X, tf.reshape(tf.where(tf.equal(best_centroids, c)), [1,-1])), axis=[1]))

        
    new_centroids = tf.concat(means, 0)
    
    return best_centroids, new_centroids

def input_fn(data, batch_size, num_gpus):
    dataset = tf.data.Dataset.from_tensor_slices(data)
    dataset = dataset.batch(batch_size)
    
    iterator = dataset.make_one_shot_iterator()
    data_iterator = iterator.get_next()
    
    dataset = tf.unstack(dataset, num=batch_size, axis=0)
    dataset_shards = [[] for i in range(num_gpus)]
    
    for i in range(batch_size):
        idx = i % num_gpus
        dataset_shards[idx].append(dataset[i])
        
    dataset_shards = [tf.paralel_stack(x) for x in dataset_shards]
    return dataset_shards

In [4]:
def distributed_kmeans(GPU_names, num_items, initial_centers, batch_data, max_iters):
    partial_directions = []
    partial_values = []
    partial_results = []
    
    with tf.name_scope('training_data'):
        with tf.device('/cpu:0'):
            parts = batch_data.shard(batch_data, len(GPU_names), 0)

            global_centroids = tf.Variable(initial_centers, name="global_centroids")
    
    tf.global_variables_initializer().run()
    
    is_cpu_ps = True
    
    for i in range(len(GPU_names)):
        worker = GPU_names[i]
        
        device_setter = _create_device_setter(is_cpu_ps, worker, len(GPU_names))
        
        (X) = batch_data
        (N, M) = X.get_shape().as_list()
        
        with tf.variable_scope('training_data', reuse=bool(i != 0)):
            with tf.name_scope('tower_%d' % i) as name_scope:
                
                with tf.device(device_setter):
                    
                    ####
                    # In the coments we denote :
                    # => N = Number of Observations
                    # => M = Number of Dimensions
                    # => K = Number of Centers
                    ####

                    # Reshapes rep_centroids and rep_points to format N x K x M so that 
                    # the 2 matrixes have the same size
                    rep_centroids = tf.reshape(tf.tile(global_centroids, [N, 1]), [N, K, M])
                    rep_points = tf.reshape(tf.tile(X, [1, K]), [N, K, M])

                    # Calculates sum_squares, a matrix of size N x K
                    # This matrix is not sqrt((X-Y)^2), it is just(X-Y)^2
                    # Since we need just the argmin(sqrt((X-Y)^2)) wich is equal to 
                    # argmin((X-Y)^2), it would be a waste of computation
                    sum_squares = tf.reduce_sum(tf.square(tf.subtract( rep_points, rep_centroids) ), axis = 2)

                    # Use argmin to select the lowest-distance point
                    # This gets a matrix of size N x 1
                    best_centroids = tf.argmin(sum_squares, axis = 1)
                    
                    means = []
                    for c in range(K):
                        means.append(
                            tf.reduce_mean(
                                tf.gather(X, tf.reshape(tf.where(tf.equal(best_centroids, c)), [1,-1])), axis=[1]))

                    new_centroids = tf.concat(means, 0)
                    
                with tf.device('/cpu:0'):
                    y_count = tf.cast(
                        tf.bincount(tf.to_int32(best_centroids), maxlength = K, minlength = K), dtype = tf.float64)
            
                    partial_mu =  tf.multiply( tf.transpose(new_centroids), y_count )

                    partial_directions.append( y_count )
                    partial_values.append( partial_mu )
                    
    with tf.name_scope('aggregation') :
        with tf.device('/cpu:0') :
            sum_direction = tf.add_n( partial_directions )
            sum_mu = tf.add_n( partial_values )

            rep_sum_direction = tf.reshape(tf.tile(sum_direction, [M]), [M, K])
            new_centers = tf.transpose( tf.div(sum_mu, rep_sum_direction) )

            update_centroid = tf.group( global_centroids.assign(new_centers) )
    
    for i in range(n_max_iters):
        [result, _] = [global_centroids, update_centroid]
            
    return result

In [5]:
data_placeholder = tf.placeholder(data_X.dtype, data_X.shape)

dataset = tf.data.Dataset.from_tensor_slices(data_placeholder)
num_items = np.floor(maxsize / size_of_each)
dataset = dataset.batch(num_items)

iterator = dataset.make_initializable_iterator()
next_element = iterator.get_next()

config = tf.ConfigProto( allow_soft_placement = True )
config.gpu_options.allow_growth = True
config.gpu_options.allocator_type = 'BFC'

init = tf.global_variables_initializer()


with tf.Session(config = config) as sess:
    sess.run(iterator.initializer, feed_dict={data_placeholder: data_X})
    print(next_element)

    print('initial: ', initial_centers)
    while True:
        try:
            item = sess.run(distributed_kmeans(GPU_names, num_items, initial_centers, next_element, n_max_iters))
        except tf.errors.OutOfRangeError:
            break

Tensor("IteratorGetNext:0", shape=(?, 2), dtype=float64)
initial:  [[-0.75325731 -2.2329732 ]
 [ 1.23589105  1.48353633]
 [ 1.35603883  1.57902516]]


KeyboardInterrupt: 