Skip to content
Permalink
Browse files
Standardize bsp synchronization between master and worker (#48)
  • Loading branch information
houzhizhen committed May 13, 2021
1 parent 930f5dd commit a125f78aae966640e452e5e1ed1aa41d5f1cd261
Showing 9 changed files with 341 additions and 128 deletions.
@@ -44,18 +44,18 @@ public Bsp4Master(Config config) {
/**
* Register Master, workers can get master information.
*/
public void registerMaster(ContainerInfo masterInfo) {
String path = this.constructPath(BspEvent.BSP_MASTER_REGISTERED);
public void masterInitDone(ContainerInfo masterInfo) {
String path = this.constructPath(BspEvent.BSP_MASTER_INIT_DONE);
this.bspClient().put(path, SerializeUtil.toBytes(masterInfo));
LOG.info("Master is registered, master info: {}", masterInfo);
LOG.info("Master init-done, master info: {}", masterInfo);
}

/**
* Wait workers registered.
*/
public List<ContainerInfo> waitWorkersRegistered() {
LOG.info("Master is waiting for workers registered");
String path = this.constructPath(BspEvent.BSP_WORKER_REGISTERED);
public List<ContainerInfo> waitWorkersInitDone() {
LOG.info("Master is waiting for workers init-done");
String path = this.constructPath(BspEvent.BSP_WORKER_INIT_DONE);
List<byte[]> serializedContainers = this.waitOnWorkersEvent(
path, this.registerTimeout());
List<ContainerInfo> containers = new ArrayList<>(this.workerCount());
@@ -64,19 +64,21 @@ public List<ContainerInfo> waitWorkersRegistered() {
SerializeUtil.fromBytes(serializedContainer, container);
containers.add(container);
}
LOG.info("Master waited all workers registered, workers: {}",
LOG.info("Master waited all workers init-done, workers: {}",
containers);
this.assignIdForWorkers(containers);
this.masterAllInitDone(containers);
return containers;
}

/**
* The master determines which superstep to start from
*/
public void masterSuperstepResume(int superstep) {
String path = this.constructPath(BspEvent.BSP_MASTER_SUPERSTEP_RESUME);
public void masterResumeDone(int superstep) {
String path = this.constructPath(BspEvent.BSP_MASTER_RESUME_DONE);
IntValue superstepWritable = new IntValue(superstep);
this.bspClient().put(path, SerializeUtil.toBytes(superstepWritable));
LOG.info("Master set superstep-resume({})", superstep);
LOG.info("Master set resume-done({})", superstep);
}

/**
@@ -106,9 +108,9 @@ public void masterInputDone() {
* check the max iteration count, and then calls masterSuperstepDone to
* synchronize superstep result.
*/
public List<WorkerStat> waitWorkersSuperstepDone(int superstep) {
public List<WorkerStat> waitWorkersStepDone(int superstep) {
LOG.info("Master is waiting for workers superstep-done({})", superstep);
String path = this.constructPath(BspEvent.BSP_WORKER_SUPERSTEP_DONE,
String path = this.constructPath(BspEvent.BSP_WORKER_STEP_DONE,
superstep);
List<byte[]> list = this.waitOnWorkersEvent(path,
this.barrierOnWorkersTimeout());
@@ -128,21 +130,43 @@ public List<WorkerStat> waitWorkersSuperstepDone(int superstep) {
* call masterPrepareSuperstepDone to let the workers know that master is
* prepared done.
*/
public void waitWorkersSuperstepPrepared(int superstep) {
LOG.info("Master is waiting for workers superstep-prepared({})",
public void waitWorkersStepPrepareDone(int superstep) {
LOG.info("Master is waiting for workers superstep-prepare-done({})",
superstep);
String path = this.constructPath(BspEvent.BSP_WORKER_SUPERSTEP_PREPARED,
String path = this.constructPath(BspEvent.BSP_WORKER_STEP_PREPARE_DONE,
superstep);
this.waitOnWorkersEvent(path, this.barrierOnWorkersTimeout());
LOG.info("Master waited workers superstep-prepared");
LOG.info("Master waited workers superstep-prepare-done");
}

/**
* Master signals the workers that the master superstep prepared.
* Master signals the workers that the master superstep prepare-done.
*/
public void masterSuperstepPrepared(int superstep) {
LOG.info("Master set superstep-prepared({})", superstep);
String path = this.constructPath(BspEvent.BSP_MASTER_SUPERSTEP_PREPARED,
public void masterStepPrepareDone(int superstep) {
LOG.info("Master set superstep-prepare-done({})", superstep);
String path = this.constructPath(BspEvent.BSP_MASTER_STEP_PREPARE_DONE,
superstep);
this.bspClient().put(path, Constants.EMPTY_BYTES);
}

/**
* Wait all workers finish computation of specified superstep.
*/
public void waitWorkersStepComputeDone(int superstep) {
LOG.info("Master is waiting for workers superstep-compute-done({})",
superstep);
String path = this.constructPath(BspEvent.BSP_WORKER_STEP_COMPUTE_DONE,
superstep);
this.waitOnWorkersEvent(path, this.barrierOnWorkersTimeout());
LOG.info("Master waited workers superstep-compute-done");
}

/**
* Master signals the workers that the all workers compute done.
*/
public void masterStepComputeDone(int superstep) {
LOG.info("Master set superstep-compute-done({})", superstep);
String path = this.constructPath(BspEvent.BSP_MASTER_STEP_COMPUTE_DONE,
superstep);
this.bspClient().put(path, Constants.EMPTY_BYTES);
}
@@ -151,9 +175,8 @@ public void masterSuperstepPrepared(int superstep) {
* Master signals the workers that superstep done. The workers read
* GraphStat and determines whether to continue iteration.
*/
public void masterSuperstepDone(int superstep,
SuperstepStat superstepStat) {
String path = this.constructPath(BspEvent.BSP_MASTER_SUPERSTEP_DONE,
public void masterStepDone(int superstep, SuperstepStat superstepStat) {
String path = this.constructPath(BspEvent.BSP_MASTER_STEP_DONE,
superstep);
this.bspClient().put(path, SerializeUtil.toBytes(superstepStat));
LOG.info("Master set superstep-done({}), graph stat: {}",
@@ -170,6 +193,16 @@ public void waitWorkersOutputDone() {
LOG.info("Master waited workers output-done");
}

/**
* Wait workers close the managers and exit first.
*/
public void waitWorkersCloseDone() {
LOG.info("Master is waiting for workers close-done");
String path = this.constructPath(BspEvent.BSP_WORKER_CLOSE_DONE);
this.waitOnWorkersEvent(path, this.barrierOnWorkersTimeout());
LOG.info("Master waited workers close-done");
}

public void clean() {
this.bspClient().clean();
LOG.info("Cleaned up the BSP data");
@@ -179,4 +212,17 @@ private List<byte[]> waitOnWorkersEvent(String prefix, long timeout) {
return this.bspClient().getChildren(prefix, this.workerCount(),
timeout, this.logInterval());
}

private void assignIdForWorkers(List<ContainerInfo> containers) {
// Assign worker id from 1.
for (int i = 0; i < containers.size(); i++) {
containers.get(i).id(i + 1);
}
}

private void masterAllInitDone(List<ContainerInfo> workers) {
String path = this.constructPath(BspEvent.BSP_MASTER_ALL_INIT_DONE);
this.bspClient().put(path, SerializeUtil.toBytes(workers));
LOG.info("Master set all-init-done, workers {}", workers);
}
}
@@ -19,7 +19,6 @@

package com.baidu.hugegraph.computer.core.bsp;

import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
@@ -47,26 +46,32 @@ public Bsp4Worker(Config config, ContainerInfo workerInfo) {
/**
* Register this worker, worker's information is passed by constructor.
*/
public void registerWorker() {
String path = this.constructPath(BspEvent.BSP_WORKER_REGISTERED,
this.workerInfo.id());
public void workerInitDone() {
/*
* Can't use workerInfo.id(), because the master does not assign
* worker id yet. The master assigns worker's id by signal
* BspEvent.BSP_MASTER_ALL_INIT_DONE. Worker get it through method
* {@link #waitMasterAllInitDone()}.
*/
String path = this.constructPath(BspEvent.BSP_WORKER_INIT_DONE,
this.workerInfo.uniqueName());
this.bspClient().put(path, SerializeUtil.toBytes(this.workerInfo));
LOG.info("Worker is registered: {}", this.workerInfo);
LOG.info("Worker is init-done: {}", this.workerInfo.uniqueName());
}

/**
* Wait master registered, get master's information includes hostname
* and port.
*/
public ContainerInfo waitMasterRegistered() {
LOG.info("Worker({}) is waiting for master registered",
public ContainerInfo waitMasterInitDone() {
LOG.info("Worker({}) is waiting for master init-done",
this.workerInfo.id());
String path = this.constructPath(BspEvent.BSP_MASTER_REGISTERED);
String path = this.constructPath(BspEvent.BSP_MASTER_INIT_DONE);
byte[] bytes = this.bspClient().get(path, this.registerTimeout(),
this.logInterval());
ContainerInfo masterInfo = new ContainerInfo();
SerializeUtil.fromBytes(bytes, masterInfo);
LOG.info("Worker({}) waited master registered: {}",
LOG.info("Worker({}) waited master init-done: {}",
this.workerInfo.id(), masterInfo);
return masterInfo;
}
@@ -75,22 +80,19 @@ public ContainerInfo waitMasterRegistered() {
* Get all workers information includes hostname and port the workers
* listen on.
*/
public List<ContainerInfo> waitWorkersRegistered() {
LOG.info("Worker({}) is waiting for master all-registered",
public List<ContainerInfo> waitMasterAllInitDone() {
LOG.info("Worker({}) is waiting for master all-init-done",
this.workerInfo.id());
// TODO: change to wait BSP_MASTER_ALL_REGISTERED
String path = this.constructPath(BspEvent.BSP_WORKER_REGISTERED);
List<byte[]> serializedContainers = this.bspClient().getChildren(
path, this.workerCount(),
this.registerTimeout(),
this.logInterval());
List<ContainerInfo> containers = new ArrayList<>(this.workerCount());
for (byte[] serializedContainer : serializedContainers) {
ContainerInfo container = new ContainerInfo();
SerializeUtil.fromBytes(serializedContainer, container);
containers.add(container);
}
LOG.info("Worker({}) waited master all-registered, workers: {}",
String path = this.constructPath(BspEvent.BSP_MASTER_ALL_INIT_DONE);
byte[] serializedContainers = this.bspClient().get(
path,
this.registerTimeout(),
this.logInterval());
List<ContainerInfo> containers = SerializeUtil.fromBytes(
serializedContainers,
ContainerInfo::new);
this.assignThisWorkerId(containers);
LOG.info("Worker({}) waited master all-init-done, workers: {}",
this.workerInfo.id(), containers);
return containers;
}
@@ -99,15 +101,15 @@ public List<ContainerInfo> waitWorkersRegistered() {
* The master set this signal to let workers knows the first superstep to
* start with.
*/
public int waitMasterSuperstepResume() {
LOG.info("Worker({}) is waiting for master superstep-resume",
public int waitMasterResumeDone() {
LOG.info("Worker({}) is waiting for master resume-done",
this.workerInfo.id());
String path = this.constructPath(BspEvent.BSP_MASTER_SUPERSTEP_RESUME);
String path = this.constructPath(BspEvent.BSP_MASTER_RESUME_DONE);
byte[] bytes = this.bspClient().get(path, this.barrierOnMasterTimeout(),
this.logInterval());
IntValue superstep = new IntValue();
SerializeUtil.fromBytes(bytes, superstep);
LOG.info("Worker({}) waited superstep-resume({})",
LOG.info("Worker({}) waited master resume-done({})",
this.workerInfo.id(), superstep.value());
return superstep.value();
}
@@ -140,35 +142,63 @@ public void waitMasterInputDone() {
* Worker set this signal to indicate the worker is ready to receive
* messages from other workers.
*/
public void workerSuperstepPrepared(int superstep) {
String path = this.constructPath(BspEvent.BSP_WORKER_SUPERSTEP_PREPARED,
public void workerStepPrepareDone(int superstep) {
String path = this.constructPath(BspEvent.BSP_WORKER_STEP_PREPARE_DONE,
superstep, this.workerInfo.id());
this.bspClient().put(path, Constants.EMPTY_BYTES);
LOG.info("Worker({}) set superstep-prepared({})",
LOG.info("Worker({}) set superstep-prepare-done({})",
this.workerInfo.id(), superstep);
}

/**
* After receive this signal, the worker can execute and send messages
* to other workers.
*/
public void waitMasterSuperstepPrepared(int superstep) {
LOG.info("Worker({}) is waiting for master superstep-prepared({})",
public void waitMasterStepPrepareDone(int superstep) {
LOG.info("Worker({}) is waiting for master superstep-prepare-done({})",
this.workerInfo.id(), superstep);
String path = this.constructPath(BspEvent.BSP_MASTER_STEP_PREPARE_DONE,
superstep);
this.bspClient().get(path, this.barrierOnMasterTimeout(),
this.logInterval());
LOG.info("Worker({}) waited master superstep-prepare-done({})",
this.workerInfo.id(), superstep);
}

/**
* Worker set this signal to indicate the worker has computed the
* vertices for specified superstep.
*/
public void workerStepComputeDone(int superstep) {
String path = this.constructPath(BspEvent.BSP_WORKER_STEP_COMPUTE_DONE,
superstep, this.workerInfo.id());
this.bspClient().put(path, Constants.EMPTY_BYTES);
LOG.info("Worker({}) set superstep-compute-done({})",
this.workerInfo.id(), superstep);
}

/**
* After receive this signal, it indicates that all workers have computed
* vertices for the superstep. The worker can calls after-superstep callback
* of managers.
*/
public void waitMasterStepComputeDone(int superstep) {
LOG.info("Worker({}) is waiting for master superstep-compute-done({})",
this.workerInfo.id(), superstep);
String path = this.constructPath(BspEvent.BSP_MASTER_SUPERSTEP_PREPARED,
String path = this.constructPath(BspEvent.BSP_MASTER_STEP_COMPUTE_DONE,
superstep);
this.bspClient().get(path, this.barrierOnMasterTimeout(),
this.logInterval());
LOG.info("Worker({}) waited master superstep-prepared({})",
LOG.info("Worker({}) waited master superstep-compute-done({})",
this.workerInfo.id(), superstep);
}

/**
* Worker set this signal after sent all messages to corresponding
* workers and sent aggregators to master.
*/
public void workerSuperstepDone(int superstep, WorkerStat workerStat) {
String path = this.constructPath(BspEvent.BSP_WORKER_SUPERSTEP_DONE,
public void workerStepDone(int superstep, WorkerStat workerStat) {
String path = this.constructPath(BspEvent.BSP_WORKER_STEP_DONE,
superstep, this.workerInfo.id());
this.bspClient().put(path, SerializeUtil.toBytes(workerStat));
LOG.info("Worker({}) set superstep-done({}), worker stat: {}",
@@ -180,10 +210,10 @@ public void workerSuperstepDone(int superstep, WorkerStat workerStat) {
* and master computes MasterComputation, and broadcast all aggregators to
* works.
*/
public SuperstepStat waitMasterSuperstepDone(int superstep) {
public SuperstepStat waitMasterStepDone(int superstep) {
LOG.info("Worker({}) is waiting for master superstep-done({})",
this.workerInfo.id(), superstep);
String path = this.constructPath(BspEvent.BSP_MASTER_SUPERSTEP_DONE,
String path = this.constructPath(BspEvent.BSP_MASTER_STEP_DONE,
superstep);
byte[] bytes = this.bspClient().get(path, this.barrierOnMasterTimeout(),
this.logInterval());
@@ -196,12 +226,34 @@ public SuperstepStat waitMasterSuperstepDone(int superstep) {

/**
* Worker set this signal to indicate the worker has outputted the result.
* It can successfully exit.
*/
public void workerOutputDone() {
String path = this.constructPath(BspEvent.BSP_WORKER_OUTPUT_DONE,
this.workerInfo.id());
this.bspClient().put(path, Constants.EMPTY_BYTES);
LOG.info("Worker({}) set output-done", this.workerInfo.id());
}

/**
* Worker set this signal to indicate the worker has stopped the managers
* and will successfully exit.
*/
public void workerCloseDone() {
String path = this.constructPath(BspEvent.BSP_WORKER_CLOSE_DONE,
this.workerInfo.id());
this.bspClient().put(path, Constants.EMPTY_BYTES);
LOG.info("Worker({}) set close-done", this.workerInfo.id());
}

// Note: The workerInfo in Bsp4Worker is the same object in WorkerService.
private void assignThisWorkerId(List<ContainerInfo> workersFromMaster) {
for (ContainerInfo container : workersFromMaster) {
if (this.workerInfo.uniqueName().equals(container.uniqueName())) {
this.workerInfo.id(container.id());
LOG.info("Worker({}) assigned id {} from master",
this.workerInfo.uniqueName(), this.workerInfo.id());
break;
}
}
}
}

0 comments on commit a125f78

Please sign in to comment.