Skip to content
Permalink
Browse files
Log failure record to compensate offset ahead problem (#87)
Change-Id: Ibba36ae57444ded2f39261512df6704725b486d7
  • Loading branch information
Linary authored and zhoney committed Oct 16, 2019
1 parent d58937d commit 5521961ae0564c90449d1c9e13cf910043e452ee
Showing 54 changed files with 1,628 additions and 521 deletions.
@@ -16,30 +16,6 @@
<PatternLayout charset="${log-charset}" pattern="%-d{yyyy-MM-dd HH:mm:ss} [%t] [%-5p] %c %x - %m%n" />
<SizeBasedTriggeringPolicy size="100MB"/>
</RollingFile>
<File name="vertex-parse-error-file" fileName="logs/vertex-parse-error.data" append="false">
<PatternLayout>
<Charset>${log-charset}</Charset>
<Pattern>%m%n</Pattern>
</PatternLayout>
</File>
<File name="edge-parse-error-file" fileName="logs/edge-parse-error.data" append="false">
<PatternLayout>
<Charset>${log-charset}</Charset>
<Pattern>%m%n</Pattern>
</PatternLayout>
</File>
<File name="vertex-insert-error-file" fileName="logs/vertex-insert-error.data" append="false">
<PatternLayout>
<Charset>${log-charset}</Charset>
<Pattern>%m%n</Pattern>
</PatternLayout>
</File>
<File name="edge-insert-error-file" fileName="logs/edge-insert-error.data" append="false">
<PatternLayout>
<Charset>${log-charset}</Charset>
<Pattern>%m%n</Pattern>
</PatternLayout>
</File>
</appenders>
<loggers>
<root level="INFO">
@@ -62,19 +38,5 @@
<logger name="com.baidu.hugegraph" level="INFO" additivity="false">
<appender-ref ref="file"/>
</logger>
<!-- parse error logger -->
<logger name="vertex-parse-error" level="INFO" additivity="false">
<appender-ref ref="vertex-parse-error-file"/>
</logger>
<logger name="edge-parse-error" level="INFO" additivity="false">
<appender-ref ref="edge-parse-error-file"/>
</logger>
<!-- insert error logger -->
<logger name="vertex-insert-error" level="INFO" additivity="false">
<appender-ref ref="vertex-insert-error-file"/>
</logger>
<logger name="edge-insert-error" level="INFO" additivity="false">
<appender-ref ref="edge-insert-error-file"/>
</logger>
</loggers>
</configuration>
@@ -24,7 +24,7 @@
<dependency>
<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph-client</artifactId>
<version>1.7.8</version>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
@@ -98,7 +98,7 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.18</version>
<version>1.19</version>
</dependency>
<dependency>
<groupId>org.tukaani</groupId>
@@ -32,14 +32,15 @@

import com.baidu.hugegraph.driver.HugeClient;
import com.baidu.hugegraph.loader.builder.ElementBuilder;
import com.baidu.hugegraph.loader.builder.Record;
import com.baidu.hugegraph.loader.constant.Constants;
import com.baidu.hugegraph.loader.constant.ElemType;
import com.baidu.hugegraph.loader.exception.LoadException;
import com.baidu.hugegraph.loader.exception.ParseException;
import com.baidu.hugegraph.loader.executor.FailureLogger;
import com.baidu.hugegraph.loader.executor.GroovyExecutor;
import com.baidu.hugegraph.loader.executor.LoadContext;
import com.baidu.hugegraph.loader.executor.LoadOptions;
import com.baidu.hugegraph.loader.failure.FailureLogger;
import com.baidu.hugegraph.loader.progress.InputProgressMap;
import com.baidu.hugegraph.loader.struct.ElementStruct;
import com.baidu.hugegraph.loader.struct.GraphStruct;
@@ -56,8 +57,6 @@ public final class HugeGraphLoader {

private static final Logger LOG = Log.logger(HugeGraphLoader.class);

private static final FailureLogger FAILURE_LOGGER = FailureLogger.parse();

private final LoadContext context;
private final GraphStruct graphStruct;
private final TaskManager taskManager;
@@ -67,7 +66,7 @@ public static void main(String[] args) {
loader.load();
}

private HugeGraphLoader(String[] args) {
public HugeGraphLoader(String[] args) {
this.context = new LoadContext(args);
this.graphStruct = GraphStruct.of(this.context);
this.taskManager = new TaskManager(this.context);
@@ -76,33 +75,28 @@ private HugeGraphLoader(String[] args) {

private void addShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LoadOptions options = this.context.options();
try {
this.context.newProgress().write(options.file);
} catch (IOException e) {
LOG.warn("Failed to write load progress", e);
}
this.stopThenShutdown();
}));
}

private void load() {
LOG.info("Start loading");
public void load() {
try {
// Create schema
this.createSchema();
// Move failure files from current to history directory
LoadUtil.moveFailureFiles(this.context);
// Load vertices
this.load(ElemType.VERTEX);
// Load edges
this.load(ElemType.EDGE);
} catch (Exception e) {
Printer.printError("Failed to load due to " + e.getMessage(), e);
this.taskManager.shutdown();
Printer.printError("Failed to load", e);
this.stopThenShutdown();
throw e;
}

// Print load summary
Printer.printSummary(this.context);
this.stopLoading(Constants.EXIT_CODE_NORM);
this.stopThenShutdown();
}

private void createSchema() {
@@ -113,7 +107,7 @@ private void createSchema() {
File schemaFile = FileUtils.getFile(options.schema);
HugeClient client = HugeClientHolder.get(options);
GroovyExecutor groovyExecutor = new GroovyExecutor();
groovyExecutor.bind("schema", client.schema());
groovyExecutor.bind(Constants.GROOVY_SCHEMA, client.schema());
String script;
try {
script = FileUtils.readFileToString(schemaFile, Constants.CHARSET);
@@ -124,118 +118,140 @@ private void createSchema() {
groovyExecutor.execute(script, client);
}

private long lastLoadedCount(ElemType type) {
if (this.context.options().incrementalMode) {
return this.context.oldProgress().totalLoaded(type);
} else {
return 0L;
}
}

private void load(ElemType type) {
Printer.printRealTimeProgress(type, this.lastLoadedCount(type));
LOG.info("Start loading {}", type.string());
this.context.loadingType(type);

Printer.printRealTimeProgress(type, LoadUtil.lastLoaded(this.context,
type));
LoadOptions options = this.context.options();
LoadSummary summary = this.context.summary();
InputProgressMap newProgress = this.context.newProgress().get(type);
StopWatch totalTime = StopWatch.createStarted();
for (ElementStruct struct : this.graphStruct.structs(type)) {
StopWatch loadTime = StopWatch.createStarted();
StopWatch totalTimer = StopWatch.createStarted();

// Load normal data
this.load(type, this.graphStruct.structs(type));
// Load failure data
if (options.incrementalMode && options.reloadFailure) {
this.load(type, this.graphStruct.structsForFailure(type, options));
}

// Waiting async worker threads finish
this.taskManager.waitFinished(type);
totalTimer.stop();
summary.totalTime(type, totalTimer.getTime(TimeUnit.MILLISECONDS));

Printer.print(summary.accumulateMetrics(type).loadSuccess());
}

private void load(ElemType type, List<ElementStruct> structs) {
LoadSummary summary = this.context.summary();
InputProgressMap newProgress = this.context.newProgress().type(type);
for (ElementStruct struct : structs) {
StopWatch loadTimer = StopWatch.createStarted();
LoadMetrics metrics = summary.metrics(struct);
// Update loading vertex/edge struct
newProgress.addStruct(struct);
// Produce batch of vertices/edges and execute loading tasks
try (ElementBuilder<?> builder = ElementBuilder.of(this.context,
struct)) {
this.load(builder, this.context.options(), metrics);
if (!this.context.stopped()) {
// Update loading vertex/edge struct
newProgress.addStruct(struct);
// Produce batch of vertices/edges and execute loading tasks
try (ElementBuilder<?> builder = ElementBuilder.of(this.context,
struct)) {
this.load(builder);
}
}

/*
* NOTE: the load task of this struct may not be completed,
* so the load rate of each struct is an inaccurate value.
*/
loadTime.stop();
metrics.loadTime(loadTime.getTime(TimeUnit.MILLISECONDS));
loadTimer.stop();
metrics.loadTime(loadTimer.getTime(TimeUnit.MILLISECONDS));
LOG.info("Loading {} '{}' with average rate: {}/s",
metrics.loadSuccess(), struct, metrics.averageLoadRate());
}
// Waiting async worker threads finish
this.taskManager.waitFinished(type);
totalTime.stop();
summary.totalTime(type, totalTime.getTime(TimeUnit.MILLISECONDS));

Printer.print(summary.accumulateMetrics(type).loadSuccess());
}

private <GE extends GraphElement> void load(ElementBuilder<GE> builder,
LoadOptions options,
LoadMetrics metrics) {
private <GE extends GraphElement> void load(ElementBuilder<GE> builder) {
ElementStruct struct = builder.struct();
LOG.info("Start parsing and loading '{}'", struct);
StopWatch parseTime = StopWatch.createStarted();
final int batchSize = this.context.options().batchSize;
LoadMetrics metrics = this.context.summary().metrics(struct);

ElemType type = struct.type();
List<GE> batch = new ArrayList<>(options.batchSize);
for (boolean finished = false; !finished;) {
StopWatch parseTimer = StopWatch.createStarted();
List<Record<GE>> batch = new ArrayList<>(batchSize);
for (boolean finished = false; !this.context.stopped() && !finished;) {
try {
if (builder.hasNext()) {
GE element = builder.next();
batch.add(element);
Record<GE> record = builder.next();
batch.add(record);
} else {
finished = true;
}
} catch (ParseException e) {
if (options.testMode) {
throw e;
}
LOG.error("Parse {} error", type, e);

FAILURE_LOGGER.error(type, e);
long failureNum = metrics.increaseParseFailure();
if (failureNum >= options.maxParseErrors) {
Printer.printError("Exceed %s %s parsing error... stopping",
options.maxParseErrors, type);
this.stopLoading(Constants.EXIT_CODE_ERROR);
}
continue;
builder.confirmOffset();
metrics.increaseParseFailure();

this.handleParseFailure(struct, e);
}
if (batch.size() >= options.batchSize ||
(finished && !batch.isEmpty())) {
this.submit(struct, batch, options, metrics, parseTime);
if (!finished) {
batch = new ArrayList<>(options.batchSize);

boolean stopped = this.context.stopped() || finished;
if (batch.size() >= batchSize || stopped) {
this.submit(struct, batch, parseTimer);
metrics.plusParseSuccess(batch.size());
batch = new ArrayList<>(batchSize);

// Confirm offset to avoid lost records
builder.confirmOffset();
if (stopped) {
this.context.newProgress().markLoaded(struct, finished);
}
}
}

parseTime.stop();
metrics.parseTime(parseTime.getTime(TimeUnit.MILLISECONDS));
parseTimer.stop();
metrics.parseTime(parseTimer.getTime(TimeUnit.MILLISECONDS));
LOG.info("Parsing {} '{}' with average rate: {}/s",
metrics.parseSuccess(), struct, metrics.parseRate());
}

private void handleParseFailure(ElementStruct struct, ParseException e) {
if (this.context.options().testMode) {
throw e;
}

LOG.error("Parse {} error", struct.type(), e);
// Write to current struct's parse failure log
FailureLogger logger = this.context.failureLogger(struct);
logger.write(e);

LoadOptions options = this.context.options();
long failures = this.context.summary().totalParseFailures();
if (failures >= options.maxParseErrors) {
Printer.printError("More than %s %s parsing error, stop parsing " +
"and waiting all insert tasks finished",
options.maxParseErrors, struct.type().string());
this.context.stopLoading();
}
}

private <GE extends GraphElement> void submit(ElementStruct struct,
List<GE> batch,
LoadOptions options,
LoadMetrics metrics,
StopWatch parseTime) {
metrics.plusParseSuccess(batch.size());
if (!options.dryRun) {
List<Record<GE>> batch,
StopWatch parseTimer) {
if (!this.context.options().dryRun && !batch.isEmpty()) {
// Parse time doesn't include submit time, it's accurate
parseTime.suspend();
this.taskManager.submitBatch(struct, batch);
parseTime.resume();
parseTimer.suspend();
try {
this.taskManager.submitBatch(struct, batch);
} finally {
parseTimer.resume();
}
}
}

private void stopLoading(int code) {
LOG.info("Stop loading");
// Shutdown task manager
private void stopThenShutdown() {
LOG.info("Stop loading then shutdown HugeGraphLoader");
this.context.stopLoading();
// Wait all insert tasks finished before exit
this.taskManager.waitFinished(this.context.loadingType());
this.taskManager.shutdown();
HugeClientHolder.close();
// Exit JVM if the code is not EXIT_CODE_NORM
if (Constants.EXIT_CODE_NORM != code) {
LoadUtil.exit(code);
}
this.context.close();
}
}

0 comments on commit 5521961

Please sign in to comment.