diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/input/EdgesInput.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/input/EdgesInput.java index be76d6f48..918b6531b 100644 --- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/input/EdgesInput.java +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/input/EdgesInput.java @@ -44,10 +44,12 @@ public class EdgesInput { private RandomAccessInput input; private final ReusablePointer idPointer; private final ReusablePointer valuePointer; - private final File edgeFile; private final GraphFactory graphFactory; - private final int flushThreshold; private final EdgeFrequency frequency; + private final File edgeFile; + private final int flushThreshold; + private final int edgeLimitNum; + private static final int UNLIMITED_NUM = -1; public EdgesInput(ComputerContext context, File edgeFile) { this.graphFactory = context.graphFactory(); @@ -57,6 +59,8 @@ public EdgesInput(ComputerContext context, File edgeFile) { this.flushThreshold = context.config().get( ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX); this.frequency = context.config().get(ComputerOptions.INPUT_EDGE_FREQ); + this.edgeLimitNum = context + .config().get(ComputerOptions.INPUT_LIMIT_EDGES_IN_ONE_VERTEX); } public void init() throws IOException { @@ -73,14 +77,15 @@ public Edges edges(ReusablePointer vidPointer) { long startPosition = this.input.position(); this.idPointer.read(this.input); int status = vidPointer.compareTo(this.idPointer); - if (status < 0) { // No edges + if (status < 0) { /* - * The current batch belong to vertex that vertex id is - * bigger than specified id. + * No edges, the current batch belong to vertex that + * vertex id is bigger than specified id. */ this.input.seek(startPosition); return EmptyEdges.instance(); - } else if (status == 0) { // Has edges + } else if (status == 0) { + // Has edges this.valuePointer.read(this.input); Edges edges = this.readEdges(this.valuePointer.input()); if (edges.size() < this.flushThreshold) { @@ -207,16 +212,27 @@ public Edge next() { } } - // TODO: use one reused Edges instance to read batches for each vertex. + /** + * Read edges & attach it by input stream, also could limit the edges here + * TODO: use one reused Edges instance to read batches for each vertex & + * limit edges in early step (like input/send stage) + */ private Edges readEdges(RandomAccessInput in) { try { + // Could limit edges to read here (unlimited by default) int count = in.readFixedInt(); + // update count when "-1 < limitNum < count" + if (this.edgeLimitNum != UNLIMITED_NUM && + this.edgeLimitNum < count) { + count = this.edgeLimitNum; + } + Edges edges = this.graphFactory.createEdges(count); if (this.frequency == EdgeFrequency.SINGLE) { for (int i = 0; i < count; i++) { Edge edge = this.graphFactory.createEdge(); // Only use targetId as subKey, use props as subValue - boolean inv = (in.readByte() == 1) ? true : false; + boolean inv = (in.readByte() == 1); edge.targetId(StreamGraphInput.readId(in)); // Read subValue edge.id(StreamGraphInput.readId(in)); @@ -230,7 +246,7 @@ private Edges readEdges(RandomAccessInput in) { for (int i = 0; i < count; i++) { Edge edge = this.graphFactory.createEdge(); // Use label + targetId as subKey, use props as subValue - boolean inv = (in.readByte() == 1) ? true : false; + boolean inv = (in.readByte() == 1); edge.label(StreamGraphInput.readLabel(in)); edge.targetId(StreamGraphInput.readId(in)); // Read subValue @@ -248,7 +264,7 @@ private Edges readEdges(RandomAccessInput in) { * Use label + sortValues + targetId as subKey, * use properties as subValue */ - boolean inv = (in.readByte() == 1) ? true : false; + boolean inv = (in.readByte() == 1); edge.label(StreamGraphInput.readLabel(in)); edge.name(StreamGraphInput.readLabel(in)); edge.targetId(StreamGraphInput.readId(in)); diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/config/ComputerOptions.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/config/ComputerOptions.java index 5414d2287..6933c2430 100644 --- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/config/ComputerOptions.java +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/config/ComputerOptions.java @@ -175,6 +175,16 @@ public static synchronized ComputerOptions instance() { 200 ); + public static final ConfigOption INPUT_LIMIT_EDGES_IN_ONE_VERTEX = + new ConfigOption<>( + "input.limit_edges_in_one_vertex", + "The maximum number of adjacent edges allowed to be " + + "attached to a vertex, the adjacent edges will be " + + "stored and transferred together as a batch unit.", + disallowEmpty(), + -1 + ); + public static final ConfigOption SORT_THREAD_NUMS = new ConfigOption<>( "sort.thread_nums",