Skip to content
Permalink
Browse files
use FlatMapperIterator to refactor FileElementFetcher
  • Loading branch information
coderzc committed Dec 29, 2021
1 parent 1245582 commit 90344f31e965a81a5038a9dce771fd023e4a3286
Showing 2 changed files with 20 additions and 33 deletions.
@@ -22,18 +22,17 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;

import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.input.ElementFetcher;
import com.baidu.hugegraph.computer.core.input.InputSplit;
import com.baidu.hugegraph.iterator.FlatMapperIterator;
import com.baidu.hugegraph.loader.builder.ElementBuilder;
import com.baidu.hugegraph.loader.builder.SchemaCache;
import com.baidu.hugegraph.loader.constant.Constants;
@@ -50,16 +49,14 @@
public abstract class FileElementFetcher<T extends GraphElement>
implements ElementFetcher<T> {

private final Config config;
private final LoadContext context;
private Iterator<T> localBatch;
private InputReader inputReader;
private List<ElementBuilder<T>> builders;
private InputReader inputReader;
private FlatMapperIterator<Line, T> localBatch;
private T next;

public FileElementFetcher(Config config) {
this.config = config;
String schemaPath = this.config.get(
String schemaPath = config.get(
ComputerOptions.INPUT_LOADER_SCHEMA_PATH);
SchemaCache schemaCache;
try {
@@ -77,13 +74,22 @@ public FileElementFetcher(Config config) {

@Override
public void prepareLoadInputSplit(InputSplit split) {
FileInputSplit fileInputSplit = (FileInputSplit) split;
if (this.inputReader != null) {
this.inputReader.close();
}
this.inputReader = this.fetch(fileInputSplit);

FileInputSplit fileInputSplit = (FileInputSplit) split;
this.builders = this.elementBuilders(this.context,
fileInputSplit.struct());
this.inputReader = this.createReader(fileInputSplit);
this.localBatch = new FlatMapperIterator<>(this.inputReader, line -> {
List<T> allElements = new ArrayList<>();
for (ElementBuilder<T> builder : this.builders) {
List<T> elements = this.buildElement(line, builder);
allElements.addAll(elements);
}
return allElements.iterator();
});
}

@Override
@@ -95,27 +101,10 @@ public boolean hasNext() {
if (this.localBatch != null && this.localBatch.hasNext()) {
this.next = this.localBatch.next();
return true;
} else {
this.localBatch = null;

if (this.inputReader != null) {
while (this.inputReader.hasNext()) {
Line line = this.inputReader.next();
List<T> allElements = new ArrayList<>();
for (ElementBuilder<T> builder : this.builders) {
List<T> elements = this.buildElement(line, builder);
allElements.addAll(elements);
}
if (CollectionUtils.isNotEmpty(allElements)) {
this.localBatch = allElements.iterator();
this.next = this.localBatch.next();
return true;
}
}
this.inputReader = null;
}
return false;
}

this.localBatch = null;
return false;
}

@Override
@@ -128,11 +117,10 @@ public T next() {
return current;
}

private InputReader fetch(FileInputSplit split) {
String path = split.path();
private InputReader createReader(FileInputSplit split) {
InputStruct struct = split.struct();
FileSource source = (FileSource) struct.input();
source.path(path);
source.path(split.path());
FileReader reader = (FileReader) InputReader.create(struct.input());
reader.init(this.context, struct);
return reader;
@@ -65,5 +65,4 @@ public void testHashCode() {
"/tmp/test");
Assert.assertEquals(split1.hashCode(), split2.hashCode());
}

}

0 comments on commit 90344f3

Please sign in to comment.