Skip to content
Permalink
Browse files
Fix bug: some input splits missed in iteration (#75)
  • Loading branch information
Linary committed Aug 3, 2021
1 parent c5648ee commit 9a53ac062dfee9cd485c5b003924763fe83122e0
Showing 3 changed files with 15 additions and 11 deletions.
@@ -19,12 +19,17 @@

package com.baidu.hugegraph.computer.core.input;

import org.slf4j.Logger;

import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.manager.Manager;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Log;

public class MasterInputManager implements Manager {

private static final Logger LOG = Log.logger(MasterInputManager.class);

public static final String NAME = "master_input";

private InputSplitFetcher fetcher;
@@ -39,8 +44,10 @@ public String name() {
public void init(Config config) {
this.fetcher = InputSourceFactory.createInputSplitFetcher(config);
this.handler = new MasterInputHandler(this.fetcher);
this.handler.createVertexInputSplits();
this.handler.createEdgeInputSplits();
int vertexSplitSize = this.handler.createVertexInputSplits();
int edgeSplitSize = this.handler.createEdgeInputSplits();
LOG.info("Master create {} vertex splits, {} edge splits",
vertexSplitSize, edgeSplitSize);
}

@Override
@@ -295,7 +295,8 @@ private SuperstepStat inputstep() {
SuperstepStat superstepStat = SuperstepStat.from(workerStats);
this.bsp4Master.masterStepDone(Constants.INPUT_SUPERSTEP,
superstepStat);
LOG.info("{} MasterService inputstep finished", this);
LOG.info("{} MasterService inputstep finished with superstat {}",
this, superstepStat);
return superstepStat;
}

@@ -98,7 +98,7 @@ public IteratorFromVertex() {
@Override
public boolean hasNext() {
VertexFetcher vertexFetcher = fetcher.vertexFetcher();
if (this.currentSplit == null || !vertexFetcher.hasNext()) {
while (this.currentSplit == null || !vertexFetcher.hasNext()) {
/*
* The first time or the current split is complete,
* need to fetch next input split meta
@@ -109,9 +109,7 @@ public boolean hasNext() {
}
vertexFetcher.prepareLoadInputSplit(this.currentSplit);
}
assert this.currentSplit != null &&
!this.currentSplit.equals(InputSplit.END_SPLIT);
return vertexFetcher.hasNext();
return true;
}

@Override
@@ -160,7 +158,7 @@ public IteratorFromEdge() {
@Override
public boolean hasNext() {
EdgeFetcher edgeFetcher = fetcher.edgeFetcher();
if (this.currentSplit == null || !edgeFetcher.hasNext()) {
while (this.currentSplit == null || !edgeFetcher.hasNext()) {
/*
* The first time or the current split is complete,
* need to fetch next input split meta
@@ -171,9 +169,7 @@ public boolean hasNext() {
}
edgeFetcher.prepareLoadInputSplit(this.currentSplit);
}
assert this.currentSplit != null &&
!this.currentSplit.equals(InputSplit.END_SPLIT);
return edgeFetcher.hasNext();
return true;
}

@Override

0 comments on commit 9a53ac0

Please sign in to comment.