# Consensus ADMM for massive data
## using scaled form, stopping criteria and MPI

In [None]:
if(!require(data.table)) install.packages("data.table")
require(data.table)
if(!require(Rmpi)) install.packages("Rmpi")
require(Rmpi)

# 1. slaves spawning
mpi.spawn.Rslaves(nslaves = 10)

In [None]:
# 2. parameter setting
k = 10 # number of slaves(machines)
d = 100 # number of data
n = 200 
p = 40
eta = 0.1 # learning rate
lo = 5
epochs = 10000
iterations = 100
e_abs = 0.001
e_rel = 0.0001
running_time = c()
global_objs = list(k = k, d = d, n = n, p = p, eta = eta, lo = lo, iterations = iterations) # for broadcasting

# 3. parameters for master
true_beta = c(rep(1,5), rep(0,p-5))
set.seed(960520)
global_beta = matrix(rnorm(p), p, 1)

# 4. parameters(beta, dual) for slaves
local_beta = list()
local_dual = list()
for(i in 1:d)
{
  set.seed(i * 520)
  local_beta[[i]] = rnorm(p)
  set.seed(i * 960)
  local_dual[[i]] = rnorm(p)
}

In [None]:
# 0. data generation
# for(i in 1:d)
# {
#     set.seed(i)
#     x = matrix(rnorm(n*p), n, p)
#     y = rbinom(n, size = 1, prob = 1 / (1 + exp(-x %*% true_beta)))
#     fwrite(cbind(x, y), file = paste('/home/jeon/an/admm_massive/data/data', i, '.csv', sep = ""))
# }

In [None]:
# 5. function for stopping criteria
e_pri_fun = function(p, e_abs, e_rel, x, z)
{
  x_mat = matrix(numeric(0), p, 0)
  for(i in 1:d) x_mat = cbind(x_mat, x[[i]])
  return(sqrt(p) * e_abs + e_rel * max(norm(x_mat, "F"), norm(z, "F")))
}
e_dual_fun = function(p, e_abs, e_rel, u)
{
  u_mat = matrix(numeric(0), p, 0)
  for(i in 1:d) u_mat = cbind(u_mat, u[[i]])
  return(sqrt(p) * e_abs + e_rel * norm(u_mat, "F"))
}

In [None]:
# 6. broadcasting
mpi.bcast.Robj2slave(global_objs)
mpi.bcast.Robj2slave(local_beta)
mpi.bcast.Robj2slave(local_dual)
mpi.bcast.Robj2slave(global_beta)

In [None]:
# initial_time = as.numeric(format(Sys.time(), "%s"))
# 7. communication
mpi.bcast.cmd(cmd = source("/home/jeon/an/admm_massive/slavefunctions_logistic_consensus.R"))
for(t in 1:epochs)
{
  for(i in 1:(d/k))
  {
    # update local beta
    mpi.bcast.Robj2slave(i)
    b_slaves = mpi.remote.exec(cmd = slavefunction(mpi.comm.rank()))
    # save partial local beta
    for(j in 1:k)
    {
      local_beta[[(d/k) * (j - 1) + i]] = b_slaves[[j]]
    }
  }
  # update global beta
  sum = rep(0, p)
  for(i in 1:d)
  {
    sum = sum + local_beta[[i]] + local_dual[[i]]
  }
  old_beta = global_beta
  global_beta = sum / d
  # update dual variables 
  for(i in 1:d)
  {
    local_dual[[i]] = local_dual[[i]] + local_beta[[i]] - global_beta
  }
  # running_time = c(running_time, as.numeric(format(Sys.time(), "%s")) - initial_time)
  fwrite(as.matrix(global_beta), file = paste('/home/jeon/an/admm_massive/beta/beta_', t, '.csv', sep=""))
  # check stopping criteria
  e_pri = e_pri_fun(p, e_abs, e_rel, local_beta, global_beta)
  e_dual = e_dual_fun(p, e_abs, e_rel, local_dual)
  b_mat = matrix(numeric(0), p, 0)
  for(i in 1:d)
  {
    b_mat = cbind(b_mat, local_beta[[i]])
  }
  r = b_mat - global_beta %*% matrix(rep(1, d), 1, d)
  s = as.matrix(lo * (global_beta - old_beta))
  if(t %% 5 == 0) cat('1. primal residual:', norm(r, "F"), '2. dual residual:', norm(s, 'F'), '\n')
  if(norm((r), "F") < e_pri && norm((s), "F") < e_dual) break
  # broadcast updated parameters
  mpi.bcast.Robj2slave(local_beta)
  mpi.bcast.Robj2slave(local_dual)
  mpi.bcast.Robj2slave(global_beta)
}

In [None]:
# 8. integrating global beta
total_beta = matrix(numeric(0), p, 0)
for(i in 1:t)
{
  total_beta = cbind(total_beta, as.matrix(fread(file = paste('/home/jeon/an/admm_massive/beta/beta_', i, '.csv', sep=""))))
}

In [None]:
# 9. loss computing
loss = rep(0, t)
for(i in 1:d)
{
  data = as.matrix(fread(file = paste('/home/jeon/an/admm_massive/data/data', i, '.csv', sep=""), header = T))
  x = data[ ,1:p]
  y = data[ ,(p + 1)]
  loss = loss - t(y) %*% x %*% total_beta + colSums(log(1 + exp(x %*% total_beta)))
}
fwrite(as.matrix(loss), file = paste('/home/jeon/an/admm_massive/admm_massive_loss.csv', sep=""))


In [None]:
# 10. save loss~running_time plot
png(file = "/home/jeon/an/admm_massive/admm_massive_loss_plot.png")
# running_time = as.matrix(fread(file = '/home/jeon/an/admm/running_time.csv', header = T))
# plot(running_time, loss)
plot(seq(1,t,1), loss)
dev.off()

In [None]:
# 11. close slaves
mpi.close.Rslaves()