In [1]:
import pandas as pd
import sklearn
import sklearn.linear_model
import sklearn.metrics
import scipy.special
import numpy as np
import time
import math
from tqdm import tqdm as tqdm

import ray

In [15]:
def predict(Xs, theta):
    z = np.inner(Xs,theta)
    h = 1/(1+np.exp(-z))
    return h

In [95]:
def calc_grad(X, y, theta):
    z = np.dot(X,theta)
    h = 1/(1+np.exp(-z))
    grd = np.dot(X.T, h-y)/y.shape[0]
    return grd

In [58]:
class AdamOptimizer:
    def __init__(self, ndims, lr=.1):
        self.eps = 1e-8
        self.b1 = 0.9
        self.b2 = 0.999
        self.lr = lr
        self.ndims = ndims
        
        self.mt = np.zeros(shape=(ndims))
        self.vt = np.zeros(shape=(ndims))
        self.theta = np.zeros(shape=(ndims))
        self.t = 1
        self.b1t = 1
        self.b2t = 1

    def step(self, grad):
        b1 = self.b1
        b2 = self.b2
        
        self.mt *= b1
        self.mt += (1-b1)*grad
        self.vt *= b2
        self.vt += (1-b2)*(grad*grad)
        self.b1t *= b1
        self.b2t *= b2
        
        at = (self.lr/math.sqrt(self.t))*np.sqrt(1-self.b2t)/(1-self.b1t)
        self.theta -= at*self.mt/(np.sqrt(self.vt)+self.eps)
        self.t += 1
        return self.theta

    def get_params(self):
        return self.theta

# Parameter Actor Server

In [59]:
@ray.remote
class AdamParameterServer(object):
    def __init__(self, dim, lr):
        self.opt = AdamOptimizer(dim, lr=lr)

    def get_params(self):
        return self.opt.get_params()

    def update_params(self, grad):
        self.opt.step(grad)

In [60]:
@ray.remote
def gradient_worker(ps, X, y, batch_size):
    n_batches = X.shape[0] // batch_size
    start_idx = 0
    
    for batch_idx in range(n_batches):
        X_b = X[start_idx:start_idx+batch_size]
        y_b = y[start_idx:start_idx+batch_size]
        cur_theta = ray.get(ps.get_params.remote())
        cur_grad = calc_grad(X_b, y_b, cur_theta)
        ps.update_params.remote(cur_grad)

        start_idx += batch_size

In [72]:
def train_remote(X, y, num_processes, batch_size):
    X_parts = np.array_split(X, num_processes)
    y_parts = np.array_split(y, num_processes)
    X_ids = [ray.put(X_part) for X_part in X_parts]
    y_ids = [ray.put(y_part) for y_part in y_parts]
    
    ps = AdamParameterServer.remote(dim=X.shape[1], lr=.1)
    
    start_time = time.time()
    workers = [
        gradient_worker.remote(ps, X_ids[i], y_ids[i], batch_size)
        for i in range(num_processes)
    ]
    worker_res = ray.get(workers)
    end_time = time.time()
    print("Elapsed Time: {}".format(end_time - start_time))
    return ray.get(ps.get_params.remote())


In [88]:
ray.shutdown()

In [89]:
ray.init(num_cpus=7)

2019-11-09 00:36:42,148	INFO resource_spec.py:205 -- Starting Ray with 21.68 GiB memory available for workers and up to 10.85 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).


{'node_ip_address': '10.138.15.252',
 'redis_address': '10.138.15.252:30047',
 'object_store_address': '/tmp/ray/session_2019-11-09_00-36-42_146872_3957/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2019-11-09_00-36-42_146872_3957/sockets/raylet',
 'webui_url': None,
 'session_dir': '/tmp/ray/session_2019-11-09_00-36-42_146872_3957'}

In [93]:
theta = train_remote(Xc, y, num_processes=4, batch_size=20000)

Elapsed Time: 1.716245174407959


In [94]:
yh = predict(Xc, opt.get_params())
score = sklearn.metrics.log_loss(y, yh)
print("Log Loss: {}".format(score))

Log Loss: 0.45167819919278257


In [71]:
ray.timeline("out.timeline")

# Synchronous Updates

In [12]:
@ray.remote
def calc_gradient(X, y, theta, start_idx, end_idx):
    X = X[start_idx:end_idx]
    y = y[start_idx:end_idx]
    
    z = np.dot(X,theta)
    h = 1/(1+np.exp(-z))
    grd = np.dot(X.T, h-y)/y.shape[0]
    return grd

In [22]:
class DistributedLR:
    def __init__(
        self, 
        num_processes=8,
    ):
        self.eps = 1e-8
        self.b1 = 0.9
        self.b2 = 0.999
        self.num_processes = num_processes
    
    def train(
        self,
        X, y,
        lr, batch_size, num_epochs=1
    ):
        ndims = X.shape[1]
        X_parts = np.array_split(X, self.num_processes)
        y_parts = np.array_split(y, self.num_processes)
        X_ids = [ray.put(X_part) for X_part in X_parts]
        y_ids = [ray.put(y_part) for y_part in y_parts]
        num_batches = int(math.ceil(X.shape[0] / (self.num_processes*batch_size)))
        
        t1 = time.time()
        theta = np.zeros(shape=(ndims))
        mt = np.zeros(shape=(ndims))
        vt = np.zeros(shape=(ndims))
        b1 = self.b1
        b2 = self.b2
        eps = self.eps
        b1t = b1
        b2t = b2

        start_idx = 0
        t = 1
        for epoch_idx in range(num_epochs):
            for batch_idx in tqdm(range(num_batches)):
                theta_id = ray.put(theta)
                grads = [
                    calc_gradient.remote(
                        X_ids[i], y_ids[i], theta_id,
                        start_idx = batch_idx*batch_size,
                        end_idx = (batch_idx+1)*batch_size
                    ) 
                    for i in range(self.num_processes)
                ]
                grad_values = ray.get(grads)
                cur_grad = np.mean(grad_values, axis=0)

                mt *= b1
                mt += (1-b1)*cur_grad
                vt *= b2
                vt += (1-b2)*(cur_grad*cur_grad)
                at = (lr/math.sqrt(t))*np.sqrt(1-b2t)/(1-b1t)
                theta -= at*mt/(np.sqrt(vt)+eps)

                start_idx += batch_size
                b1t *= b1
                b2t *= b2
                t += 1

        t2 = time.time()
        print("Total Time: {}".format(t2-t1))
        return theta


In [40]:
@ray.remote
class GradientWorker:
    def __init__(
        self,
        X, y
    ):
        self.X = X
        self.y = y
    
    def calc_grad(
        self,
        theta,
        start_idx,
        end_idx
    ):
        X = self.X[start_idx:end_idx]
        y = self.y[start_idx:end_idx]
        z = np.dot(X,theta)
        h = 1/(1+np.exp(-z))
        grd = np.dot(X.T, h-y)/y.shape[0]
        return grd

In [41]:
class DistributedActorLR:
    def __init__(
        self, 
        num_processes=8,
    ):
        self.eps = 1e-8
        self.b1 = 0.9
        self.b2 = 0.999
        self.num_processes = num_processes
    
    def train(
        self,
        X, y,
        lr, batch_size, num_epochs=1
    ):
        ndims = X.shape[1]
        X_parts = np.array_split(X, self.num_processes)
        y_parts = np.array_split(y, self.num_processes)
        X_ids = [ray.put(X_part) for X_part in X_parts]
        y_ids = [ray.put(y_part) for y_part in y_parts]
        num_batches = int(math.ceil(X.shape[0] / (self.num_processes*batch_size)))
        workers = [GradientWorker.remote(X_ids[i], y_ids[i]) for i in range(self.num_processes)]
        
        t1 = time.time()
        theta = np.zeros(shape=(ndims))
        mt = np.zeros(shape=(ndims))
        vt = np.zeros(shape=(ndims))
        b1 = self.b1
        b2 = self.b2
        eps = self.eps
        b1t = b1
        b2t = b2

        start_idx = 0
        t = 1
        for epoch_idx in range(num_epochs):
            for batch_idx in tqdm(range(num_batches)):
                theta_id = ray.put(theta)
                grads = [
                    cur_worker.calc_grad.remote(
                        theta_id,
                        start_idx = batch_idx*batch_size,
                        end_idx = (batch_idx+1)*batch_size
                    ) 
                    for cur_worker in workers
                ]
                grad_values = ray.get(grads)
                cur_grad = np.mean(grad_values, axis=0)

                mt *= b1
                mt += (1-b1)*cur_grad
                vt *= b2
                vt += (1-b2)*(cur_grad*cur_grad)
                at = (lr/math.sqrt(t))*np.sqrt(1-b2t)/(1-b1t)
                theta -= at*mt/(np.sqrt(vt)+eps)

                start_idx += batch_size
                b1t *= b1
                b2t *= b2
                t += 1

        t2 = time.time()
        print("Total Time: {}".format(t2-t1))
        return theta


In [12]:
@ray.remote
class AdamWorker:
    def __init__(
        self,
        ndims
    ):        
        self.eps = 1e-8
        self.b1 = 0.9
        self.b2 = 0.999
        self.mt = np.zeros(shape=(ndims))
        self.vt = np.zeros(shape=(ndims))
        self.t = 1

    def calc_grad(
        self,
        X, y,
        theta
    ):
        z = np.dot(X,theta)
        h = 1/(1+np.exp(-z))
        grd = np.dot(X.T, h-y)/y.shape[0]
        return grd

    def run_epoch(
        self,
        X, y,
        theta,
        lr, batch_size,
    ):
        b1 = self.b1
        b2 = self.b2
        eps = self.eps

        start_idx = 0
        num_batches = int(math.ceil(X.shape[0] / batch_size))
        for batch_idx in range(num_batches):
            cur_grad = self.calc_grad(
                X=X[start_idx:start_idx+batch_size],
                y=y[start_idx:start_idx+batch_size],
                theta=theta,
            )
            self.mt *= b1
            self.mt += (1-b1)*cur_grad
            self.vt *= b2
            self.vt += (1-b2)*(cur_grad*cur_grad)
            b1t = math.pow(b1, self.t)
            b2t = math.pow(b2, self.t)
            at = (lr/math.sqrt(self.t))*np.sqrt(1-b2t)/(1-b1t)
            theta -= at*self.mt/(np.sqrt(self.vt)+eps)

            start_idx += batch_size
            self.t += 1
        return theta

In [22]:
class DistributedAdamActorLR:
    def __init__(
        self, 
        num_processes=4,
    ):
        self.num_processes = num_processes
    
    def train(
        self,
        X, y,
        lr, batch_size, num_epochs=1
    ):
        ndims = X.shape[1]
        X_parts = np.array_split(X, self.num_processes)
        y_parts = np.array_split(y, self.num_processes)
        X_ids = [ray.put(X_part) for X_part in X_parts]
        y_ids = [ray.put(y_part) for y_part in y_parts]
        num_batches = int(math.ceil(X.shape[0] / (self.num_processes*batch_size)))
        workers = [AdamWorker.remote(ndims) for i in range(self.num_processes)]
        theta = np.zeros(shape=(ndims))
        
        t1 = time.time()
        start_idx = 0
        for epoch_idx in range(num_epochs):
            theta_ids = [
                workers[i].run_epoch.remote(
                    X_ids[i], y_ids[i],
                    theta, lr, batch_size
                )
                for i in range(self.num_processes)
            ]
            thetas = ray.get(theta_ids)
            theta = np.mean(thetas, axis=0)
        t2 = time.time()
        print("Total Time: {}".format(t2-t1))
        return thetas[0]

# Data Loading

In [6]:
df = pd.read_feather("~/data/avazu/train_10M.feather")

In [7]:
target = "click"
CAT_COLS = [
    "C1", "banner_pos", 
    "site_category", "app_category", 
    "device_type", "device_conn_type",
]
df_enc = pd.get_dummies(df[CAT_COLS], columns=CAT_COLS)
df_final = pd.concat([
    df[target], df_enc
], axis=1)

nrows = 4_000_000
np.random.seed(0)
r_order = np.random.permutation(nrows)
Xs = df_enc.values[:nrows][r_order]
y = df[target].values[:nrows][r_order]

Xc = np.concatenate([
    np.repeat(1, repeats=Xs.shape[0]).reshape(-1,1),
    Xs
], axis=1)

# Local Training

In [96]:
n_rows = len(Xc)
start_idx = 0
t = 1
lr = 0.1
start_time = time.time()
opt = AdamOptimizer(Xc.shape[1], lr=lr)
batch_size = 20000
for batch_idx in tqdm(range(n_rows // batch_size)):
    X_b = Xc[start_idx:start_idx+batch_size]
    y_b = y[start_idx:start_idx+batch_size]
    cur_grad = calc_grad(X_b, y_b, opt.get_params())
    opt.step(cur_grad)
    
    start_idx += batch_size
end_time = time.time()
print("Total Time: {}".format(end_time - start_time))

100%|██████████| 200/200 [00:01<00:00, 116.87it/s]

Total Time: 1.7173857688903809





In [83]:
yh = predict(Xc, opt.get_params())
score = sklearn.metrics.log_loss(y, yh)
print("Log Loss: {}".format(score))

Log Loss: 0.45167819919278257


# Training

In [16]:
ray.shutdown()

In [17]:
ray.init(num_cpus=4)

2019-11-08 00:47:49,856	INFO resource_spec.py:205 -- Starting Ray with 23.44 GiB memory available for workers and up to 11.74 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).


{'node_ip_address': '10.138.15.252',
 'redis_address': '10.138.15.252:48027',
 'object_store_address': '/tmp/ray/session_2019-11-08_00-47-49_854898_7223/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2019-11-08_00-47-49_854898_7223/sockets/raylet',
 'webui_url': None,
 'session_dir': '/tmp/ray/session_2019-11-08_00-47-49_854898_7223'}

In [57]:
lr = DistributedLR(
    num_processes=4
)
theta = lr.train(
    Xc, y, 
    lr=.1, 
    batch_size=20000, num_epochs=2
)

100%|██████████| 50/50 [00:01<00:00, 27.99it/s]
100%|██████████| 50/50 [00:01<00:00, 26.25it/s]

Total Time: 3.707850217819214





In [58]:
lr = DistributedActorLR(
    num_processes=4
)
theta = lr.train(
    Xc, y, 
    lr=.1, 
    batch_size=20000, num_epochs=2
)

100%|██████████| 50/50 [00:01<00:00, 27.86it/s]
100%|██████████| 50/50 [00:01<00:00, 27.66it/s]

Total Time: 3.6107382774353027





In [23]:
lr = DistributedAdamActorLR(
    num_processes=4
)
theta = lr.train(
    Xc, y, 
    lr=.1, 
    batch_size=1024, num_epochs=1
)

Total Time: 3.728069305419922


In [19]:
ray.timeline("timeline.json")

In [24]:
yh = predict(Xc, theta)
score = sklearn.metrics.log_loss(y, yh)
print("Log Loss: {}".format(score))

Log Loss: 0.45167728815928004
