Skip to content
Permalink
Browse files
release file descriptor after input and compute (#120)
  • Loading branch information
corgiboygsj committed Oct 21, 2021
1 parent 89fe866 commit 775526b76a94d3582e039b30cd7cf45bb2d80b93
Showing 3 changed files with 41 additions and 8 deletions.
@@ -25,6 +25,7 @@
import org.slf4j.Logger;

import com.baidu.hugegraph.computer.core.common.ComputerContext;
import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.graph.partition.PartitionStat;
import com.baidu.hugegraph.computer.core.graph.value.Value;
import com.baidu.hugegraph.computer.core.manager.Managers;
@@ -73,11 +74,41 @@ public WorkerStat input() {
for (Map.Entry<Integer, PeekableIterator<KvEntry>> entry :
vertices.entrySet()) {
int partition = entry.getKey();
PeekableIterator<KvEntry> vertexIter = entry.getValue();
PeekableIterator<KvEntry> edgesIter =
edges.getOrDefault(
partition,
PeekableIterator.emptyIterator());

FileGraphPartition<M> part = new FileGraphPartition<>(this.context,
this.managers,
partition);
PartitionStat partitionStat = part.input(entry.getValue(),
edges.get(partition));
PartitionStat partitionStat = null;
ComputerException inputException = null;
try {
partitionStat = part.input(vertexIter, edgesIter);
} catch (ComputerException e) {
inputException = e;
} finally {
try {
vertexIter.close();
edgesIter.close();
} catch (Exception e) {
String message = "Failed to close vertex or edge file " +
"iterator";
ComputerException closeException = new ComputerException(
message, e);
if (inputException != null) {
inputException.addSuppressed(closeException);
} else {
throw closeException;
}
}
if (inputException != null) {
throw inputException;
}
}

workerStat.add(partitionStat);
this.partitions.put(partition, part);
}
@@ -95,9 +95,6 @@ public FileGraphPartition(ComputerContext context,

protected PartitionStat input(PeekableIterator<KvEntry> vertices,
PeekableIterator<KvEntry> edges) {
if (edges == null) {
edges = PeekableIterator.emptyIterator();
}
try {
createFile(this.vertexFile);
createFile(this.edgeFile);
@@ -150,7 +147,7 @@ protected PartitionStat compute0(ComputationContext context,
}
try {
this.afterCompute(0);
} catch (IOException e) {
} catch (Exception e) {
throw new ComputerException("Error occurred when afterCompute", e);
}
return new PartitionStat(this.partition, this.vertexCount,
@@ -206,7 +203,7 @@ protected PartitionStat compute(ComputationContext context,
}
try {
this.afterCompute(superstep);
} catch (IOException e) {
} catch (Exception e) {
throw new ComputerException(
"Error occurred when afterCompute at superstep %s",
e, superstep);
@@ -371,10 +368,11 @@ private void beforeCompute(int superstep) throws IOException {
this.curValueOutput = new BufferedFileOutput(this.curValueFile);
}

private void afterCompute(int superstep) throws IOException {
private void afterCompute(int superstep) throws Exception {
this.vertexInput.close();
this.edgesInput.close();
if (superstep != 0) {
this.messageInput.close();
this.preStatusInput.close();
this.preValueInput.close();
this.preStatusFile.delete();
@@ -71,6 +71,10 @@ public Iterator<T> iterator(ReusablePointer vidPointer) {
return new MessageIterator(vidPointer);
}

public void close() throws Exception {
this.messages.close();
}

private class MessageIterator implements Iterator<T> {

// It indicates whether the value can be returned to client.

0 comments on commit 775526b

Please sign in to comment.