Skip to content
Permalink
Browse files
GIRAPH-1139
closes #30
  • Loading branch information
neggert authored and edunov committed Aug 28, 2017
1 parent 5ed0d65 commit cc489350eba8db8bc62a4fca77b7ad9aa14bcf3d
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 24 deletions.
@@ -188,12 +188,12 @@
private long cachedApplicationAttempt = UNSET_APPLICATION_ATTEMPT;
/** Job id, to ensure uniqueness */
private final String jobId;
/** Task partition, to ensure uniqueness */
private final int taskPartition;
/** Task id, from partition and application attempt to ensure uniqueness */
private final int taskId;
/** My hostname */
private final String hostname;
/** Combination of hostname '_' partition (unique id) */
private final String hostnamePartitionId;
/** Combination of hostname '_' task (unique id) */
private final String hostnameTaskId;
/** Graph partitioner */
private final GraphPartitionerFactory<I, V, E> graphPartitionerFactory;
/** Mapper that will do the graph computation */
@@ -231,16 +231,15 @@ public BspService(
this.context = context;
this.graphTaskManager = graphTaskManager;
this.conf = graphTaskManager.getConf();

this.jobId = conf.getJobId();
this.taskPartition = conf.getTaskPartition();
this.restartedSuperstep = conf.getLong(
GiraphConstants.RESTART_SUPERSTEP, UNSET_SUPERSTEP);
try {
this.hostname = conf.getLocalHostname();
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
this.hostnamePartitionId = hostname + "_" + getTaskPartition();
this.graphPartitionerFactory = conf.createGraphPartitioner();

basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId;
@@ -252,6 +251,8 @@ public BspService(
applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
cleanedUpPath = basePath + CLEANED_UP_DIR;



String restartJobId = RESTART_JOB_ID.get(conf);

savedCheckpointBasePath =
@@ -272,7 +273,7 @@ public BspService(
}
if (LOG.isInfoEnabled()) {
LOG.info("BspService: Connecting to ZooKeeper with job " + jobId +
", " + getTaskPartition() + " on " + serverPortList);
", partition " + conf.getTaskPartition() + " on " + serverPortList);
}
try {
this.zk = new ZooKeeperExt(serverPortList,
@@ -288,6 +289,10 @@ public BspService(
throw new RuntimeException(e);
}

this.taskId = (int) getApplicationAttempt() * conf.getMaxWorkers() +
conf.getTaskPartition();
this.hostnameTaskId = hostname + "_" + getTaskId();

//Trying to restart from the latest superstep
if (restartJobId != null &&
restartedSuperstep == UNSET_SUPERSTEP) {
@@ -529,12 +534,12 @@ public final String getHostname() {
return hostname;
}

public final String getHostnamePartitionId() {
return hostnamePartitionId;
public final String getHostnameTaskId() {
return hostnameTaskId;
}

public final int getTaskPartition() {
return taskPartition;
public final int getTaskId() {
return taskId;
}

public final GraphTaskManager<I, V, E> getGraphTaskManager() {
@@ -579,15 +579,15 @@ private void logMissingWorkersOnSuperstep(
if (LOG.isInfoEnabled()) {
Set<Integer> partitionSet = new TreeSet<Integer>();
for (WorkerInfo workerInfo : healthyWorkerInfoList) {
partitionSet.add(workerInfo.getTaskId());
partitionSet.add(workerInfo.getTaskId() % maxWorkers);
}
for (WorkerInfo workerInfo : unhealthyWorkerInfoList) {
partitionSet.add(workerInfo.getTaskId());
partitionSet.add(workerInfo.getTaskId() % maxWorkers);
}
for (int i = 1; i <= maxWorkers; ++i) {
if (partitionSet.contains(Integer.valueOf(i))) {
continue;
} else if (i == getTaskPartition()) {
} else if (i == getTaskId() % maxWorkers) {
continue;
} else {
LOG.info("logMissingWorkersOnSuperstep: No response from " +
@@ -802,7 +802,7 @@ public boolean becomeMaster() {
try {
myBid =
getZkExt().createExt(masterElectionPath +
"/" + getHostnamePartitionId(),
"/" + getHostnameTaskId(),
null,
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL,
@@ -841,7 +841,7 @@ public boolean becomeMaster() {
}
if (masterChildArr.get(0).equals(myBid)) {
GiraphStats.getInstance().getCurrentMasterTaskPartition().
setValue(getTaskPartition());
setValue(getTaskId());

globalCommHandler = new MasterGlobalCommHandler(
new MasterAggregatorHandler(getConfiguration(), getContext()),
@@ -860,7 +860,7 @@ public boolean becomeMaster() {
getGraphTaskManager().createUncaughtExceptionHandler());
masterInfo.setInetSocketAddress(masterServer.getMyAddress(),
masterServer.getLocalHostOrIp());
masterInfo.setTaskId(getTaskPartition());
masterInfo.setTaskId(getTaskId());
masterClient =
new NettyMasterClient(getContext(), getConfiguration(), this,
getGraphTaskManager().createUncaughtExceptionHandler());
@@ -1211,6 +1211,7 @@ public void restartFromCheckpoint(long checkpoint) {
setApplicationAttempt(getApplicationAttempt() + 1);
setCachedSuperstep(checkpoint);
setRestartedSuperstep(checkpoint);
checkpointStatus = CheckpointStatus.NONE;
setJobState(ApplicationState.START_SUPERSTEP,
getApplicationAttempt(),
checkpoint);
@@ -1740,7 +1741,7 @@ private CheckpointStatus getCheckpointStatus(long superstep) {
if (checkpointFrequency == 0) {
return CheckpointStatus.NONE;
}
long firstCheckpoint = INPUT_SUPERSTEP + 1 + checkpointFrequency;
long firstCheckpoint = INPUT_SUPERSTEP + 1;
if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
firstCheckpoint = getRestartedSuperstep() + checkpointFrequency;
}
@@ -1912,7 +1913,7 @@ public void cleanup(SuperstepState superstepState) throws IOException {
// for workers and masters, the master will clean up the ZooKeeper
// znodes associated with this job.
String masterCleanedUpPath = cleanedUpPath + "/" +
getTaskPartition() + MASTER_SUFFIX;
getTaskId() + MASTER_SUFFIX;
try {
String finalFinishedPath =
getZkExt().createExt(masterCleanedUpPath,
@@ -216,7 +216,7 @@ public BspServiceWorker(
graphTaskManager.createUncaughtExceptionHandler());
workerInfo.setInetSocketAddress(workerServer.getMyAddress(),
workerServer.getLocalHostOrIp());
workerInfo.setTaskId(getTaskPartition());
workerInfo.setTaskId(getTaskId());
workerClient = new NettyWorkerClient<I, V, E>(context, conf, this,
graphTaskManager.createUncaughtExceptionHandler());
workerServer.setFlowControl(workerClient.getFlowControl());
@@ -243,7 +243,7 @@ public BspServiceWorker(
}
observers = conf.createWorkerObservers(context);

WorkerProgress.get().setTaskId(getTaskPartition());
WorkerProgress.get().setTaskId(getTaskId());
workerProgressWriter = conf.trackJobProgressOnClient() ?
new WorkerProgressWriter(graphTaskManager.getJobProgressTracker()) :
null;
@@ -921,7 +921,7 @@ private void writeFinshedSuperstepInfoToZK(

String finishedWorkerPath =
getWorkerFinishedPath(getApplicationAttempt(), getSuperstep()) +
"/" + getHostnamePartitionId();
"/" + workerInfo.getHostnameId();
try {
getZkExt().createExt(finishedWorkerPath,
workerFinishedInfoObj.toString().getBytes(Charset.defaultCharset()),
@@ -1202,7 +1202,7 @@ public void cleanup(FinishedSuperstepStats finishedSuperstepStats)
// for workers and masters, the master will clean up the ZooKeeper
// znodes associated with this job.
String workerCleanedUpPath = cleanedUpPath + "/" +
getTaskPartition() + WORKER_SUFFIX;
getTaskId() + WORKER_SUFFIX;
try {
String finalFinishedPath =
getZkExt().createExt(workerCleanedUpPath,
@@ -1303,7 +1303,7 @@ public void storeCheckpoint() throws IOException {
// Notify master that checkpoint is stored
String workerWroteCheckpoint =
getWorkerWroteCheckpointPath(getApplicationAttempt(),
getSuperstep()) + "/" + getHostnamePartitionId();
getSuperstep()) + "/" + workerInfo.getHostnameId();
try {
getZkExt().createExt(workerWroteCheckpoint,
new byte[0],

0 comments on commit cc48935

Please sign in to comment.