Skip to content
Permalink
Browse files
fix: vertex at compute0() may be inactive (#134)
  • Loading branch information
javeme committed Nov 4, 2021
1 parent 161cc89 commit 382130a5575a81f7e9372601b846a771b86d10ba
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 53 deletions.
@@ -133,23 +133,16 @@ public WorkerStat compute(ComputationContext context, int superstep) {
WorkerStat workerStat = new WorkerStat();
Map<Integer, PartitionStat> partitionStats = new HashMap<>(
this.partitions.size());
if (superstep == 0) {
// TODO: parallel compute process.
for (FileGraphPartition<M> partition : this.partitions.values()) {
PartitionStat stat = partition.compute0(context,
this.computation);
partitionStats.put(stat.partitionId(), stat);
}
} else {
// TODO: parallel compute process.
for (FileGraphPartition<M> partition : this.partitions.values()) {
PartitionStat stat = partition.compute(context,
this.computation,
superstep);
partitionStats.put(stat.partitionId(), stat);
}
// TODO: parallel compute process.
for (FileGraphPartition<M> partition : this.partitions.values()) {
PartitionStat stat = partition.compute(context,
this.computation,
superstep);
partitionStats.put(stat.partitionId(), stat);
}

this.sendManager.finishSend(MessageType.MSG);

// After compute and send finish signal.
Map<Integer, MessageStat> recvStats = this.recvManager.messageStats();
for (Map.Entry<Integer, PartitionStat> entry :
@@ -121,50 +121,71 @@ protected PartitionStat input(PeekableIterator<KvEntry> vertices,
this.edgeCount, 0L);
}

protected PartitionStat compute0(ComputationContext context,
Computation<M> computation) {
long activeVertexCount = 0L;
protected PartitionStat compute(ComputationContext context,
Computation<M> computation,
int superstep) {
try {
this.beforeCompute(0);
this.beforeCompute(superstep);
} catch (IOException e) {
throw new ComputerException(
"Error occurred when beforeCompute at superstep 0", e);
"Error occurred when beforeCompute at superstep %s",
e, superstep);
}

long activeVertexCount;
try {
activeVertexCount = superstep == 0 ?
this.compute0(context, computation) :
this.compute1(context, computation, superstep);
} catch (Exception e) {
throw new ComputerException(
"Error occurred when compute at superstep %s",
e, superstep);
}

try {
this.afterCompute(superstep);
} catch (Exception e) {
throw new ComputerException(
"Error occurred when afterCompute at superstep %s",
e, superstep);
}

return new PartitionStat(this.partition, this.vertexCount,
this.edgeCount,
this.vertexCount - activeVertexCount);
}


private long compute0(ComputationContext context,
Computation<M> computation) {
long activeVertexCount = 0L;
while (this.vertexInput.hasNext()) {
Vertex vertex = this.vertexInput.next();
vertex.reactivate();

Edges edges = this.edgesInput.edges(this.vertexInput.idPointer());
vertex.edges(edges);

computation.compute0(context, vertex);

if (vertex.active()) {
activeVertexCount++;
}

try {
this.saveVertex(vertex);
this.saveVertexStatusAndValue(vertex);
} catch (IOException e) {
throw new ComputerException(
"Error occurred when saveVertex: %s", e, vertex);
}
}
try {
this.afterCompute(0);
} catch (Exception e) {
throw new ComputerException("Error occurred when afterCompute", e);
}
return new PartitionStat(this.partition, this.vertexCount,
this.edgeCount,
this.vertexCount - activeVertexCount);
return activeVertexCount;
}

protected PartitionStat compute(ComputationContext context,
Computation<M> computation,
int superstep) {
try {
this.beforeCompute(superstep);
} catch (IOException e) {
throw new ComputerException(
"Error occurred when beforeCompute at superstep %s",
e, superstep);
}
private long compute1(ComputationContext context,
Computation<M> computation,
int superstep) {
Value<?> result = this.context.config().createObject(
ComputerOptions.ALGORITHM_RESULT_CLASS);
long activeVertexCount = 0L;
@@ -195,22 +216,13 @@ protected PartitionStat compute(ComputationContext context,
}

try {
this.saveVertex(vertex);
this.saveVertexStatusAndValue(vertex);
} catch (IOException e) {
throw new ComputerException(
"Error occurred when saveVertex", e);
}
}
try {
this.afterCompute(superstep);
} catch (Exception e) {
throw new ComputerException(
"Error occurred when afterCompute at superstep %s",
e, superstep);
}
return new PartitionStat(this.partition, this.vertexCount,
this.edgeCount,
this.vertexCount - activeVertexCount);
return activeVertexCount;
}

protected PartitionStat output() {
@@ -231,6 +243,7 @@ protected PartitionStat output() {

Edges edges = this.edgesInput.edges(this.vertexInput.idPointer());
vertex.edges(edges);

output.write(vertex);
}

@@ -267,19 +280,19 @@ private void readVertexStatusAndValue(Vertex vertex, Value<?> result) {
}
} catch (IOException e) {
throw new ComputerException(
"Failed to read status of vertex %s", e, vertex);
"Failed to read status of vertex '%s'", e, vertex);
}

try {
result.read(this.preValueInput);
vertex.value(result);
} catch (IOException e) {
throw new ComputerException(
"Failed to read status of vertex %s", e, vertex);
"Failed to read value of vertex '%s'", e, vertex);
}
}

private void saveVertex(Vertex vertex) throws IOException {
private void saveVertexStatusAndValue(Vertex vertex) throws IOException {
this.curStatusOutput.writeBoolean(vertex.active());
Value<?> value = vertex.value();
E.checkNotNull(value, "Vertex's value can't be null");

0 comments on commit 382130a

Please sign in to comment.