diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/receiver/MessageRecvManager.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/receiver/MessageRecvManager.java index b918229c3..9a8c258d8 100644 --- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/receiver/MessageRecvManager.java +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/receiver/MessageRecvManager.java @@ -113,6 +113,21 @@ public void beforeSuperstep(Config config, int superstep) { @Override public void afterSuperstep(Config config, int superstep) { + final int firstMsgSuperstep = Constants.INPUT_SUPERSTEP + 1; + + if (superstep > firstMsgSuperstep) { + this.messagePartitions.clearOldFiles(superstep - 1); + } else { + assert superstep == firstMsgSuperstep; + + assert this.vertexPartitions != null; + this.vertexPartitions.clearOldFiles(Constants.INPUT_SUPERSTEP); + this.vertexPartitions = null; + + assert this.edgePartitions != null; + this.edgePartitions.clearOldFiles(Constants.INPUT_SUPERSTEP); + this.edgePartitions = null; + } } @Override @@ -198,7 +213,6 @@ public Map> vertexPartitions() { E.checkState(this.vertexPartitions != null, "The vertexPartitions can't be null"); VertexMessageRecvPartitions partitions = this.vertexPartitions; - this.vertexPartitions = null; return partitions.iterators(); } @@ -206,7 +220,6 @@ public Map> edgePartitions() { E.checkState(this.edgePartitions != null, "The edgePartitions can't be null"); EdgeMessageRecvPartitions partitions = this.edgePartitions; - this.edgePartitions = null; return partitions.iterators(); } diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/receiver/MessageRecvPartitions.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/receiver/MessageRecvPartitions.java index db39b5706..ea24bbaec 100644 --- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/receiver/MessageRecvPartitions.java +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/receiver/MessageRecvPartitions.java @@ -19,9 +19,13 @@ package com.baidu.hugegraph.computer.core.receiver; +import java.io.File; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.commons.io.FileUtils; + import com.baidu.hugegraph.computer.core.common.ComputerContext; import com.baidu.hugegraph.computer.core.config.Config; import com.baidu.hugegraph.computer.core.network.buffer.ManagedBuffer; @@ -90,4 +94,17 @@ public Map messageStats() { } return entries; } + + // Clear all directory of assign superstep files + public void clearOldFiles(int superstep) { + P partition = this.partitions.values().stream() + .findFirst().orElse(null); + if (partition != null) { + List dirs = this.fileGenerator + .superstepDirs(superstep, partition.type()); + for (String dir : dirs) { + FileUtils.deleteQuietly(new File(dir)); + } + } + } } diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/store/FileGenerator.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/store/FileGenerator.java index 8a5169b99..844a7474d 100644 --- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/store/FileGenerator.java +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/store/FileGenerator.java @@ -20,6 +20,7 @@ package com.baidu.hugegraph.computer.core.store; import java.nio.file.Paths; +import java.util.List; import java.util.UUID; public interface FileGenerator { @@ -62,4 +63,6 @@ default String randomDirectory(String... paths) { UUID.randomUUID().toString()) .toString(); } + + List dirs(); } diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/store/FileManager.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/store/FileManager.java index 0b1be49c6..52193e42f 100644 --- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/store/FileManager.java +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/store/FileManager.java @@ -98,6 +98,11 @@ public String nextDirectory() { return this.dirs.get(index % this.dirs.size()); } + @Override + public List dirs() { + return this.dirs; + } + /** * Creates the directory named by specified dir. */ diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/store/SuperstepFileGenerator.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/store/SuperstepFileGenerator.java index f4ced1633..46816f5cc 100644 --- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/store/SuperstepFileGenerator.java +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/store/SuperstepFileGenerator.java @@ -19,6 +19,9 @@ package com.baidu.hugegraph.computer.core.store; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; public class SuperstepFileGenerator { @@ -37,4 +40,18 @@ public String nextPath(String type) { UUID.randomUUID().toString()}; return this.fileGenerator.nextDirectory(paths); } + + /* + Get all directory of assign superstep files. + It will used for delete old files and file must be use nextPath function + to generate otherwise will can't delete old files. + */ + public List superstepDirs(int superstep, String type) { + List superstepDirs = new ArrayList<>(); + String[] paths = {type, Integer.toString(superstep)}; + for (String dir : this.fileGenerator.dirs()) { + superstepDirs.add(Paths.get(dir, paths).toString()); + } + return superstepDirs; + } }