Skip to content
Permalink
Browse files
Load vertices before edges when check-vertex is true (#206)
* Wait vertices finished before load edges
* Log parse and load progress info to track status
* Fix senstive pass

Change-Id: If1f6102afe6b1ed090520aecd52a9641606f446d
  • Loading branch information
Linary committed Apr 28, 2021
1 parent e6915d8 commit 05967f00cce23e495a1ef8b03cb044348a1dfecc
Showing 16 changed files with 230 additions and 107 deletions.
@@ -6,7 +6,7 @@

<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph-loader</artifactId>
<version>0.11.1</version>
<version>0.11.2</version>
<packaging>jar</packaging>

<name>hugegraph-loader</name>
@@ -62,7 +62,7 @@
<dependency>
<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph-common</artifactId>
<version>1.8.4</version>
<version>1.8.5</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
@@ -78,7 +78,7 @@
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
<version>2.7</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
@@ -56,7 +56,7 @@

public final class HugeGraphLoader {

private static final Logger LOG = Log.logger(HugeGraphLoader.class);
public static final Logger LOG = Log.logger(HugeGraphLoader.class);

private final LoadContext context;
private final LoadMapping mapping;
@@ -133,9 +133,9 @@ private void clearAllDataIfNeeded() {
HugeClient client = HugeClientHolder.create(options);
String message = "I'm sure to delete all data";

LOG.info("Prepare to clear the data of graph {}", options.graph);
LOG.info("Prepare to clear the data of graph '{}'", options.graph);
client.graphs().clear(options.graph, message);
LOG.info("The graph {} has been cleared successfully", options.graph);
LOG.info("The graph '{}' has been cleared successfully", options.graph);

options.timeout = requestTimeout;
client.close();
@@ -161,7 +161,6 @@ private void createSchema() {
}

private void loadInputs() {
LOG.info("Start loading");
Printer.printRealtimeProgress(this.context);
LoadOptions options = this.context.options();
LoadSummary summary = this.context.summary();
@@ -170,10 +169,10 @@ private void loadInputs() {
try {
if (!options.failureMode) {
// Load normal data from user supplied input structs
this.load(this.mapping.structs());
this.loadInputs(this.mapping.structs());
} else {
// Load failure data from generated input structs
this.load(this.mapping.structsForFailure(options));
this.loadInputs(this.mapping.structsForFailure(options));
}
// Waiting for async worker threads finish
this.manager.waitFinished();
@@ -185,7 +184,24 @@ private void loadInputs() {
Printer.printFinalProgress(this.context);
}

private void load(List<InputStruct> structs) {
private void loadInputs(List<InputStruct> structs) {
if (this.context.options().checkVertex) {
LOG.info("Forced to load vertices before edges since set " +
"option check-vertex=true");
SplittedInputStructs splitted = this.splitStructs(structs);
// Load all vertex structs
this.loadStructs(splitted.vertexInputStructs);
// Wait all vertex load tasks finished
this.manager.waitFinished("vertex insert tasks");
// Load all edge structs
this.loadStructs(splitted.edgeInputStructs);
} else {
// Load vertex and edge structs concurrent in the same input
this.loadStructs(structs);
}
}

private void loadStructs(List<InputStruct> structs) {
// Load input structs one by one
for (InputStruct struct : structs) {
if (this.context.stopped()) {
@@ -199,7 +215,7 @@ private void load(List<InputStruct> structs) {
// Init reader
reader.init(this.context, struct);
// Load data from current input mapping
this.load(struct, reader);
this.loadStruct(struct, reader);
} catch (InitException e) {
throw new LoadException("Failed to init input reader", e);
}
@@ -210,12 +226,13 @@ private void load(List<InputStruct> structs) {
* TODO: Seperate classes: ReadHandler -> ParseHandler -> InsertHandler
* Let load task worked in pipeline mode
*/
private void load(InputStruct struct, InputReader reader) {
LOG.info("Start parsing and loading '{}'", struct);
private void loadStruct(InputStruct struct, InputReader reader) {
LOG.info("Start parsing '{}'", struct);
LoadMetrics metrics = this.context.summary().metrics(struct);
metrics.startInFlight();

ParseTaskBuilder taskBuilder = new ParseTaskBuilder(this.context,
struct);

final int batchSize = this.context.options().batchSize;
List<Line> lines = new ArrayList<>(batchSize);
for (boolean finished = false; !finished;) {
@@ -256,6 +273,9 @@ private void load(InputStruct struct, InputReader reader) {
lines = new ArrayList<>(batchSize);
}
}

metrics.stopInFlight();
LOG.info("Finish parsing '{}'", struct);
}

/**
@@ -264,13 +284,15 @@ private void load(InputStruct struct, InputReader reader) {
private void executeParseTask(InputStruct struct, ElementMapping mapping,
ParseTaskBuilder.ParseTask task) {
long start = System.currentTimeMillis();
// Sync parse
List<List<Record>> batches = task.get();
long end = System.currentTimeMillis();
this.context.summary().addTimeRange(mapping.type(), start, end);

if (this.context.options().dryRun || CollectionUtils.isEmpty(batches)) {
return;
}
// Async load
for (List<Record> batch : batches) {
this.manager.submitBatch(struct, mapping, batch);
}
@@ -315,6 +337,23 @@ private void handleParseFailure() {
}
}

private SplittedInputStructs splitStructs(List<InputStruct> structs) {
SplittedInputStructs splitted = new SplittedInputStructs();
for (InputStruct struct : structs) {
InputStruct result = struct.extractVertexStruct();
if (result != InputStruct.EMPTY) {
splitted.vertexInputStructs.add(result);
}
}
for (InputStruct struct : structs) {
InputStruct result = struct.extractEdgeStruct();
if (result != InputStruct.EMPTY) {
splitted.edgeInputStructs.add(result);
}
}
return splitted;
}

private boolean reachedMaxReadLines() {
final long maxReadLines = this.context.options().maxReadLines;
if (maxReadLines == -1L) {
@@ -346,4 +385,15 @@ private synchronized void stopThenShutdown() {
}
}
}

private static class SplittedInputStructs {

private final List<InputStruct> vertexInputStructs;
private final List<InputStruct> edgeInputStructs;

public SplittedInputStructs() {
this.vertexInputStructs = new ArrayList<>();
this.edgeInputStructs = new ArrayList<>();
}
}
}
@@ -85,7 +85,7 @@ public final class LoadOptions {
@Parameter(names = {"--trust-store-password"}, arity = 1,
description = "The password of client truststore file used " +
"when https protocol is enabled")
public String trustStorePassword = null;
public String trustStoreToken = null;

@Parameter(names = {"--token"}, arity = 1,
description = "The token of graph for authentication")
@@ -30,10 +30,14 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.google.common.collect.ImmutableList;

@JsonPropertyOrder({"id", "skip", "input", "vertices", "edges"})
public class InputStruct implements Checkable {

public static final InputStruct EMPTY = new InputStruct(ImmutableList.of(),
ImmutableList.of());

@JsonProperty("id")
private String id;
@JsonProperty("skip")
@@ -104,8 +108,31 @@ public void add(ElementMapping mapping) {
}
}

public InputStruct extractVertexStruct() {
if (this.vertices.isEmpty()) {
return EMPTY;
}
InputStruct struct = new InputStruct(this.vertices, ImmutableList.of());
struct.id = this.id;
struct.skip = this.skip;
struct.input = this.input;
return struct;
}

public InputStruct extractEdgeStruct() {
if (this.edges.isEmpty()) {
return EMPTY;
}
InputStruct struct = new InputStruct(ImmutableList.of(), this.edges);
struct.id = this.id;
struct.skip = this.skip;
struct.input = this.input;
return struct;
}

@Override
public String toString() {
return String.format("input-mapping(id=%s)", this.id);
return String.format("InputStruct{id=%s, input=%s}",
this.id, this.input);
}
}

0 comments on commit 05967f0

Please sign in to comment.