Skip to content
Permalink
Browse files
log job-id in BSP client (#50)
* log job-id in BSP client

* set BspBase.init() private
  • Loading branch information
javeme committed May 13, 2021
1 parent e8e60b9 commit 930f5dd5028f2b64425432ab3166c5433df15aa0
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 38 deletions.
@@ -29,33 +29,42 @@ public abstract class BspBase {

private static final Logger LOG = Log.logger(BspBase.class);

private Config config;
private BspClient bspClient;
private int workerCount;
private long registerTimeout;
private long barrierOnMasterTimeout;
private long barrierOnWorkersTimeout;
private long logInterval;
private final Config config;

private final String jobId;
private final int workerCount;
private final long registerTimeout;
private final long barrierOnMasterTimeout;
private final long barrierOnWorkersTimeout;
private final long logInterval;

private final BspClient bspClient;

public BspBase(Config config) {
this.config = config;
}

/**
* Do initialization operation, like connect to etcd or ZooKeeper cluster.
*/
public void init() {
this.bspClient = this.createBspClient();
this.bspClient.init();
this.jobId = config.get(ComputerOptions.JOB_ID);
this.workerCount = this.config.get(ComputerOptions.JOB_WORKERS_COUNT);
this.registerTimeout = this.config.get(
ComputerOptions.BSP_REGISTER_TIMEOUT);
ComputerOptions.BSP_REGISTER_TIMEOUT);
this.barrierOnWorkersTimeout = this.config.get(
ComputerOptions.BSP_WAIT_WORKERS_TIMEOUT);
this.barrierOnMasterTimeout = this.config.get(
ComputerOptions.BSP_WAIT_MASTER_TIMEOUT);
this.logInterval = this.config.get(ComputerOptions.BSP_LOG_INTERVAL);
LOG.info("Connect to BSP server: {}", this.bspClient.endpoint());

this.bspClient = this.init();
}

/**
* Do initialization operation, like connect to etcd or ZooKeeper cluster.
*/
private BspClient init() {
BspClient bspClient = this.createBspClient();
bspClient.init(this.jobId);
LOG.info("Init {} BSP connection to '{}' for job '{}'",
bspClient.type(), bspClient.endpoint(), this.jobId);
return bspClient;
}

/**
@@ -64,11 +73,12 @@ public void init() {
*/
public void close() {
this.bspClient.close();
LOG.info("Closed the BSP connection: {}", this.bspClient.endpoint());
LOG.info("Closed {} BSP connection '{}' for job '{}'",
this.bspClient.type(), this.bspClient.endpoint(), this.jobId);
}

private BspClient createBspClient() {
// TODO: the type of bsp client can be get from config
// TODO: create from factory. the type of bsp can be get from config
return new EtcdBspClient(this.config);
}

@@ -24,27 +24,33 @@
interface BspClient {

/**
* Do initialization operation, like connect to etcd cluster.
* Return bsp server type, like etcd or zookeeper.
*/
void init();
String type();

/**
* Contrary to init. Could not do any bsp operation after close is called.
* Get endpoint of the bsp server.
*/
void close();
String endpoint();

/**
* Clean the bsp data of the job.
* Do initialization operation, like connect to etcd server.
*/
void clean();
void init(String namespace);

/**
* Get endpoint of the bsp server.
* Close connection from bsp server.
* Could not do any bsp operation after close is called.
*/
String endpoint();
void close();

/**
* Clean the bsp data of the job.
*/
void clean();

/**
* Put KV pair to the bsp server.
* Put key & value to the bsp server.
*/
void put(String key, byte[] value);

@@ -26,23 +26,27 @@

public final class EtcdBspClient implements BspClient {

private final Config config;
private final String endpoints;

private EtcdClient etcdClient;

public EtcdBspClient(Config config) {
this.config = config;
this.endpoints = config.get(ComputerOptions.BSP_ETCD_ENDPOINTS);
}

@Override
public String type() {
return "etcd";
}

@Override
public String endpoint() {
return this.config.get(ComputerOptions.BSP_ETCD_ENDPOINTS);
return this.endpoints;
}

@Override
public void init() {
String endpoints = this.endpoint();
String jobId = this.config.get(ComputerOptions.JOB_ID);
this.etcdClient = new EtcdClient(endpoints, jobId);
public void init(String namespace) {
this.etcdClient = new EtcdClient(this.endpoints, namespace);
}

@Override
@@ -77,7 +77,6 @@ public void init(Config config) {
this.maxSuperStep = this.config.get(ComputerOptions.BSP_MAX_SUPER_STEP);

this.bsp4Master = new Bsp4Master(this.config);
this.bsp4Master.init();

InetSocketAddress rpcAddress = this.initManagers();

@@ -81,7 +81,6 @@ public void init(Config config) {
this.workerInfo = new ContainerInfo(0, dataAddress.getHostName(),
0, dataAddress.getPort());
this.bsp4Worker = new Bsp4Worker(this.config, this.workerInfo);
this.bsp4Worker.init();
this.computation = this.config.createObject(
ComputerOptions.WORKER_COMPUTATION_CLASS);
this.computation.init(config);
@@ -56,11 +56,9 @@ public void setup() {
);

this.bsp4Master = new Bsp4Master(config);
this.bsp4Master.init();
this.masterInfo = new ContainerInfo(-1, "localhost", 8001, 8002);
this.workerInfo = new ContainerInfo(0, "localhost", 8003, 8004);
this.bsp4Worker = new Bsp4Worker(config, this.workerInfo);
this.bsp4Worker.init();
this.maxSuperStep = config.get(ComputerOptions.BSP_MAX_SUPER_STEP);
}

0 comments on commit 930f5dd

Please sign in to comment.