Skip to content
Permalink
Browse files
standard log format of bsp module and improve some error message (#47)
  • Loading branch information
javeme committed May 10, 2021
1 parent 1aaf9c0 commit 639453a49128b8683dc51ec6524a09119e413de5
Showing 11 changed files with 99 additions and 51 deletions.
@@ -28,9 +28,9 @@
import com.baidu.hugegraph.computer.core.common.ContainerInfo;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.SuperstepStat;
import com.baidu.hugegraph.computer.core.worker.WorkerStat;
import com.baidu.hugegraph.computer.core.graph.value.IntValue;
import com.baidu.hugegraph.computer.core.util.SerializeUtil;
import com.baidu.hugegraph.computer.core.worker.WorkerStat;
import com.baidu.hugegraph.util.Log;

public class Bsp4Master extends BspBase {
@@ -47,14 +47,14 @@ public Bsp4Master(Config config) {
public void registerMaster(ContainerInfo masterInfo) {
String path = this.constructPath(BspEvent.BSP_MASTER_REGISTERED);
this.bspClient().put(path, SerializeUtil.toBytes(masterInfo));
LOG.info("Master registered, masterInfo: {}", masterInfo);
LOG.info("Master is registered, master info: {}", masterInfo);
}

/**
* Wait workers registered.
*/
public List<ContainerInfo> waitWorkersRegistered() {
LOG.info("Master is waiting workers registered");
LOG.info("Master is waiting for workers registered");
String path = this.constructPath(BspEvent.BSP_WORKER_REGISTERED);
List<byte[]> serializedContainers = this.waitOnWorkersEvent(
path, this.registerTimeout());
@@ -64,7 +64,8 @@ public List<ContainerInfo> waitWorkersRegistered() {
SerializeUtil.fromBytes(serializedContainer, container);
containers.add(container);
}
LOG.info("All workers registered, workers: {}", containers);
LOG.info("Master waited all workers registered, workers: {}",
containers);
return containers;
}

@@ -75,27 +76,28 @@ public void masterSuperstepResume(int superstep) {
String path = this.constructPath(BspEvent.BSP_MASTER_SUPERSTEP_RESUME);
IntValue superstepWritable = new IntValue(superstep);
this.bspClient().put(path, SerializeUtil.toBytes(superstepWritable));
LOG.info("Master resume superstep {}", superstep);
LOG.info("Master set superstep-resume({})", superstep);
}

/**
* Wait all workers read input splits, and send all vertices and
* edges to correspond workers. After this, master call masterInputDone.
*/
public void waitWorkersInputDone() {
LOG.info("Master is waiting workers input done");
LOG.info("Master is waiting for workers input-done");
String path = this.constructPath(BspEvent.BSP_WORKER_INPUT_DONE);
this.waitOnWorkersEvent(path, this.barrierOnWorkersTimeout());
LOG.info("Master waited workers input-done");
}

/**
* The master signal workers the master input done, the workers can merge
* vertices and edges after receive this signal.
*/
public void masterInputDone() {
LOG.info("Master set input-done");
String path = this.constructPath(BspEvent.BSP_MASTER_INPUT_DONE);
this.bspClient().put(path, Constants.EMPTY_BYTES);
LOG.info("Master input done");
}

/**
@@ -105,7 +107,7 @@ public void masterInputDone() {
* synchronize superstep result.
*/
public List<WorkerStat> waitWorkersSuperstepDone(int superstep) {
LOG.info("Master is waiting workers superstep {} done", superstep);
LOG.info("Master is waiting for workers superstep-done({})", superstep);
String path = this.constructPath(BspEvent.BSP_WORKER_SUPERSTEP_DONE,
superstep);
List<byte[]> list = this.waitOnWorkersEvent(path,
@@ -116,7 +118,7 @@ public List<WorkerStat> waitWorkersSuperstepDone(int superstep) {
SerializeUtil.fromBytes(bytes, workerStat);
result.add(workerStat);
}
LOG.info("Workers superstep {} done, workers stat: {}",
LOG.info("Master waited workers superstep-done({}), workers stat: {}",
superstep, result);
return result;
}
@@ -127,21 +129,22 @@ public List<WorkerStat> waitWorkersSuperstepDone(int superstep) {
* prepared done.
*/
public void waitWorkersSuperstepPrepared(int superstep) {
LOG.info("Master is waiting workers prepare superstep {} done",
LOG.info("Master is waiting for workers superstep-prepared({})",
superstep);
String path = this.constructPath(BspEvent.BSP_WORKER_SUPERSTEP_PREPARED,
superstep);
this.waitOnWorkersEvent(path, this.barrierOnWorkersTimeout());
LOG.info("Master waited workers superstep-prepared");
}

/**
* Master signals the workers that the master superstep prepared.
*/
public void masterSuperstepPrepared(int superstep) {
LOG.info("Master set superstep-prepared({})", superstep);
String path = this.constructPath(BspEvent.BSP_MASTER_SUPERSTEP_PREPARED,
superstep);
this.bspClient().put(path, Constants.EMPTY_BYTES);
LOG.info("Master prepare superstep {} done", superstep);
}

/**
@@ -153,22 +156,23 @@ public void masterSuperstepDone(int superstep,
String path = this.constructPath(BspEvent.BSP_MASTER_SUPERSTEP_DONE,
superstep);
this.bspClient().put(path, SerializeUtil.toBytes(superstepStat));
LOG.info("Master superstep {} done, graph stat: {}",
LOG.info("Master set superstep-done({}), graph stat: {}",
superstep, superstepStat);
}

/**
* Wait workers output the vertices.
*/
public void waitWorkersOutputDone() {
LOG.info("Master is waiting workers output done");
LOG.info("Master is waiting for workers output-done");
String path = this.constructPath(BspEvent.BSP_WORKER_OUTPUT_DONE);
this.waitOnWorkersEvent(path, this.barrierOnWorkersTimeout());
LOG.info("Master waited workers output-done");
}

public void clean() {
this.bspClient().clean();
LOG.info("Clean bsp data done");
LOG.info("Cleaned up the BSP data");
}

private List<byte[]> waitOnWorkersEvent(String prefix, long timeout) {
@@ -28,9 +28,9 @@
import com.baidu.hugegraph.computer.core.common.ContainerInfo;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.SuperstepStat;
import com.baidu.hugegraph.computer.core.worker.WorkerStat;
import com.baidu.hugegraph.computer.core.graph.value.IntValue;
import com.baidu.hugegraph.computer.core.util.SerializeUtil;
import com.baidu.hugegraph.computer.core.worker.WorkerStat;
import com.baidu.hugegraph.util.Log;

public class Bsp4Worker extends BspBase {
@@ -51,20 +51,23 @@ public void registerWorker() {
String path = this.constructPath(BspEvent.BSP_WORKER_REGISTERED,
this.workerInfo.id());
this.bspClient().put(path, SerializeUtil.toBytes(this.workerInfo));
LOG.info("Worker {} registered", this.workerInfo);
LOG.info("Worker is registered: {}", this.workerInfo);
}

/**
* Wait master registered, get master's information includes hostname
* and port.
*/
public ContainerInfo waitMasterRegistered() {
LOG.info("Worker({}) is waiting for master registered",
this.workerInfo.id());
String path = this.constructPath(BspEvent.BSP_MASTER_REGISTERED);
byte[] bytes = this.bspClient().get(path, this.registerTimeout(),
this.logInterval());
ContainerInfo masterInfo = new ContainerInfo();
SerializeUtil.fromBytes(bytes, masterInfo);
LOG.info("Master {} registered", masterInfo);
LOG.info("Worker({}) waited master registered: {}",
this.workerInfo.id(), masterInfo);
return masterInfo;
}

@@ -73,6 +76,9 @@ public ContainerInfo waitMasterRegistered() {
* listen on.
*/
public List<ContainerInfo> waitWorkersRegistered() {
LOG.info("Worker({}) is waiting for master all-registered",
this.workerInfo.id());
// TODO: change to wait BSP_MASTER_ALL_REGISTERED
String path = this.constructPath(BspEvent.BSP_WORKER_REGISTERED);
List<byte[]> serializedContainers = this.bspClient().getChildren(
path, this.workerCount(),
@@ -84,7 +90,8 @@ public List<ContainerInfo> waitWorkersRegistered() {
SerializeUtil.fromBytes(serializedContainer, container);
containers.add(container);
}
LOG.info("All workers registered, workers: {}", containers);
LOG.info("Worker({}) waited master all-registered, workers: {}",
this.workerInfo.id(), containers);
return containers;
}

@@ -93,13 +100,15 @@ public List<ContainerInfo> waitWorkersRegistered() {
* start with.
*/
public int waitMasterSuperstepResume() {
LOG.info("Worker({}) is waiting for master superstep-resume",
this.workerInfo.id());
String path = this.constructPath(BspEvent.BSP_MASTER_SUPERSTEP_RESUME);
byte[] bytes = this.bspClient().get(path,
this.barrierOnMasterTimeout(),
byte[] bytes = this.bspClient().get(path, this.barrierOnMasterTimeout(),
this.logInterval());
IntValue superstep = new IntValue();
SerializeUtil.fromBytes(bytes, superstep);
LOG.info("Resume from superstep {}", superstep.value());
LOG.info("Worker({}) waited superstep-resume({})",
this.workerInfo.id(), superstep.value());
return superstep.value();
}

@@ -111,18 +120,20 @@ public void workerInputDone() {
String path = this.constructPath(BspEvent.BSP_WORKER_INPUT_DONE,
this.workerInfo.id());
this.bspClient().put(path, Constants.EMPTY_BYTES);
LOG.info("Worker {} input done", this.workerInfo.id());
LOG.info("Worker({}) set input-done", this.workerInfo.id());
}

/**
* Wait master signal that all workers input done. After this, worker
* can merge the vertices and edges.
*/
public void waitMasterInputDone() {
LOG.info("Worker({}) is waiting for master input-done",
this.workerInfo.id());
String path = this.constructPath(BspEvent.BSP_MASTER_INPUT_DONE);
this.bspClient().get(path, this.barrierOnMasterTimeout(),
this.logInterval());
LOG.info("Master input done");
LOG.info("Worker({}) waited master input-done", this.workerInfo.id());
}

/**
@@ -133,7 +144,7 @@ public void workerSuperstepPrepared(int superstep) {
String path = this.constructPath(BspEvent.BSP_WORKER_SUPERSTEP_PREPARED,
superstep, this.workerInfo.id());
this.bspClient().put(path, Constants.EMPTY_BYTES);
LOG.info("Worker {} prepared superstep {} done",
LOG.info("Worker({}) set superstep-prepared({})",
this.workerInfo.id(), superstep);
}

@@ -142,11 +153,14 @@ public void workerSuperstepPrepared(int superstep) {
* to other workers.
*/
public void waitMasterSuperstepPrepared(int superstep) {
LOG.info("Waiting master prepared superstep {} done", superstep);
LOG.info("Worker({}) is waiting for master superstep-prepared({})",
this.workerInfo.id(), superstep);
String path = this.constructPath(BspEvent.BSP_MASTER_SUPERSTEP_PREPARED,
superstep);
this.bspClient().get(path, this.barrierOnMasterTimeout(),
this.logInterval());
LOG.info("Worker({}) waited master superstep-prepared({})",
this.workerInfo.id(), superstep);
}

/**
@@ -157,8 +171,8 @@ public void workerSuperstepDone(int superstep, WorkerStat workerStat) {
String path = this.constructPath(BspEvent.BSP_WORKER_SUPERSTEP_DONE,
superstep, this.workerInfo.id());
this.bspClient().put(path, SerializeUtil.toBytes(workerStat));
LOG.info("Worker superstep {} done, worker stat: {}",
superstep, workerStat);
LOG.info("Worker({}) set superstep-done({}), worker stat: {}",
this.workerInfo.id(), superstep, workerStat);
}

/**
@@ -167,15 +181,16 @@ public void workerSuperstepDone(int superstep, WorkerStat workerStat) {
* works.
*/
public SuperstepStat waitMasterSuperstepDone(int superstep) {
LOG.info("Worker({}) is waiting for master superstep-done({})",
this.workerInfo.id(), superstep);
String path = this.constructPath(BspEvent.BSP_MASTER_SUPERSTEP_DONE,
superstep);
byte[] bytes = this.bspClient().get(path,
this.barrierOnMasterTimeout(),
byte[] bytes = this.bspClient().get(path, this.barrierOnMasterTimeout(),
this.logInterval());
SuperstepStat superstepStat = new SuperstepStat();
SerializeUtil.fromBytes(bytes, superstepStat);
LOG.info("Master superstep {} done, graph stat: {}",
superstep, superstepStat);
LOG.info("Worker({}) waited master superstep-done({}), graph stat: {}",
this.workerInfo.id(), superstep, superstepStat);
return superstepStat;
}

@@ -187,6 +202,6 @@ public void workerOutputDone() {
String path = this.constructPath(BspEvent.BSP_WORKER_OUTPUT_DONE,
this.workerInfo.id());
this.bspClient().put(path, Constants.EMPTY_BYTES);
LOG.info("Worker {} output done", this.workerInfo.id());
LOG.info("Worker({}) set output-done", this.workerInfo.id());
}
}
@@ -55,6 +55,7 @@ public void init() {
this.barrierOnMasterTimeout = this.config.get(
ComputerOptions.BSP_WAIT_MASTER_TIMEOUT);
this.logInterval = this.config.get(ComputerOptions.BSP_LOG_INTERVAL);
LOG.info("Connect to BSP server: {}", this.bspClient.endpoint());
}

/**
@@ -63,7 +64,7 @@ public void init() {
*/
public void close() {
this.bspClient.close();
LOG.info("closed");
LOG.info("Closed the BSP connection: {}", this.bspClient.endpoint());
}

private BspClient createBspClient() {
@@ -38,10 +38,24 @@ interface BspClient {
*/
void clean();

/**
* Get endpoint of the bsp server.
*/
String endpoint();

/**
* Put KV pair to the bsp server.
*/
void put(String key, byte[] value);

/**
* Get value by key from the bsp server.
*/
byte[] get(String key);

/**
* Get value by key from the bsp server with timout.
*/
byte[] get(String key, long timeout, long logInterval);

/**
@@ -33,9 +33,14 @@ public EtcdBspClient(Config config) {
this.config = config;
}

@Override
public String endpoint() {
return this.config.get(ComputerOptions.BSP_ETCD_ENDPOINTS);
}

@Override
public void init() {
String endpoints = this.config.get(ComputerOptions.BSP_ETCD_ENDPOINTS);
String endpoints = this.endpoint();
String jobId = this.config.get(ComputerOptions.JOB_ID);
this.etcdClient = new EtcdClient(endpoints, jobId);
}
@@ -219,7 +219,8 @@ private byte[] waitAndGetFromPutEvent(ByteSequence keySeq, long revision,
try (Watch.Watcher watcher = this.watch.watch(keySeq, watchOption,
consumer)) {
return barrierEvent.await(timeout, logInterval, () -> {
LOG.info("Wait for key '{}'", keySeq.toString(ENCODING));
LOG.info("Wait for key '{}' with timeout {}ms",
keySeq.toString(ENCODING), timeout);
});
}
}
@@ -373,8 +374,10 @@ private List<byte[]> waitAndPrefixGetFromPutEvent(
watchOption,
consumer)) {
return barrierEvent.await(timeout, logInterval, () -> {
LOG.info("Wait for keys with prefix '{}', expect {} actual {}",
prefixSeq.toString(ENCODING), count, keyValues.size());
LOG.info("Wait for keys with prefix '{}' and timeout {}ms, " +
"expect {} keys but actual got {} keys",
prefixSeq.toString(ENCODING),
timeout, count, keyValues.size());
});
}
}

0 comments on commit 639453a

Please sign in to comment.