Skip to content
Permalink
Browse files
Fix load time and rate of vertex/edge is inaccurate (#200)
* install HugeGraphServer from specified commit id

Change-Id: I6ec5e1d6d17d91d7f4f0e7db766d21056706f269
  • Loading branch information
Linary committed Apr 8, 2021
1 parent cf81bb2 commit 2dcbf15db24a8f140832c466068a172f7871f0ee
Showing 12 changed files with 275 additions and 17 deletions.
@@ -50,3 +50,4 @@ env:
- TRAVIS_DIR=assembly/travis
- STATIC_DIR=assembly/static
- SERVER_VERSION=0.11.2
- COMMIT_ID=c0dff5b233e853716ca6f3f28a5cda05e6f3d639
@@ -11,9 +11,9 @@ CLIENT_BRANCH=$1
HUGEGRAPH_BRANCH=${CLIENT_BRANCH}
HUGEGRAPH_GIT_URL="https://github.com/hugegraph/hugegraph.git"

git clone --depth 1 ${HUGEGRAPH_GIT_URL}
git clone --depth 100 ${HUGEGRAPH_GIT_URL}
cd hugegraph
git checkout ${HUGEGRAPH_BRANCH}
git checkout ${COMMIT_ID}
mvn package -DskipTests
mv hugegraph-*.tar.gz ../
cd ../
@@ -32,6 +32,7 @@
import com.baidu.hugegraph.driver.HugeClient;
import com.baidu.hugegraph.loader.builder.Record;
import com.baidu.hugegraph.loader.constant.Constants;
import com.baidu.hugegraph.loader.constant.ElemType;
import com.baidu.hugegraph.loader.exception.InitException;
import com.baidu.hugegraph.loader.exception.LoadException;
import com.baidu.hugegraph.loader.exception.ReadException;
@@ -42,6 +43,7 @@
import com.baidu.hugegraph.loader.mapping.InputStruct;
import com.baidu.hugegraph.loader.mapping.LoadMapping;
import com.baidu.hugegraph.loader.metrics.LoadMetrics;
import com.baidu.hugegraph.loader.metrics.LoadSummary;
import com.baidu.hugegraph.loader.reader.InputReader;
import com.baidu.hugegraph.loader.reader.line.Line;
import com.baidu.hugegraph.loader.task.ParseTaskBuilder;
@@ -162,8 +164,9 @@ private void loadInputs() {
LOG.info("Start loading");
Printer.printRealtimeProgress(this.context);
LoadOptions options = this.context.options();
LoadSummary summary = this.context.summary();

this.context.summary().startTimer();
summary.startTotalTimer();
try {
if (!options.failureMode) {
// Load normal data from user supplied input structs
@@ -175,7 +178,9 @@ private void loadInputs() {
// Waiting for async worker threads finish
this.manager.waitFinished();
} finally {
this.context.summary().stopTimer();
summary.stopFlowRangeTimer(ElemType.VERTEX);
summary.stopFlowRangeTimer(ElemType.EDGE);
summary.stopTotalTimer();
}
Printer.printFinalProgress(this.context);
}
@@ -258,7 +263,11 @@ private void load(InputStruct struct, InputReader reader) {
*/
private void executeParseTask(InputStruct struct, ElementMapping mapping,
ParseTaskBuilder.ParseTask task) {
long start = System.currentTimeMillis();
List<List<Record>> batches = task.get();
long end = System.currentTimeMillis();
this.context.summary().addTimeRange(mapping.type(), start, end);

if (this.context.options().dryRun || CollectionUtils.isEmpty(batches)) {
return;
}
@@ -34,13 +34,21 @@ public final class LoadSummary {
private final LongAdder vertexLoaded;
private final LongAdder edgeLoaded;
private final StopWatch totalTimer;
private final LongAdder vertexTime;
private final LongAdder edgeTime;
private final RangesTimer vertexRangesTimer;
private final RangesTimer edgeRangesTimer;
// Every input struct has a metric
private final Map<String, LoadMetrics> inputMetricsMap;

public LoadSummary() {
this.vertexLoaded = new LongAdder();
this.edgeLoaded = new LongAdder();
this.totalTimer = new StopWatch();
this.vertexTime = new LongAdder();
this.edgeTime = new LongAdder();
this.vertexRangesTimer = new RangesTimer(10000);
this.edgeRangesTimer = new RangesTimer(10000);
this.inputMetricsMap = InsertionOrderUtil.newMap();
}

@@ -103,29 +111,50 @@ public long totalInsertFailures() {
.reduce(0L, Long::sum);
}

public void addTimeRange(ElemType type, long start, long end) {
RangesTimer timer = type.isVertex() ? this.vertexRangesTimer :
this.edgeRangesTimer;
timer.addTimeRange(start, end);
}

public void stopFlowRangeTimer(ElemType type) {
RangesTimer timer = type.isVertex() ? this.vertexRangesTimer :
this.edgeRangesTimer;
LongAdder elemTime = type.isVertex() ? this.vertexTime : this.edgeTime;
elemTime.add(timer.totalTime());
}

public long totalTime() {
return this.totalTimer.getTime();
}

public void startTimer() {
public long vertexTime() {
return this.vertexTime.longValue();
}

public long edgeTime() {
return this.edgeTime.longValue();
}

public void startTotalTimer() {
if (!this.totalTimer.isStarted()) {
this.totalTimer.start();
}
}

public void stopTimer() {
public void stopTotalTimer() {
if (!this.totalTimer.isStopped()) {
this.totalTimer.stop();
}
}

public long loadRate(ElemType type) {
long totalTime = this.totalTime();
boolean isVertex = type.isVertex();
long totalTime = isVertex ? this.vertexTime() : this.edgeTime();
if (totalTime == 0) {
return -1;
}
long success = type.isVertex() ? this.vertexLoaded.longValue() :
this.edgeLoaded.longValue();
long success = isVertex ? this.vertexLoaded() : this.edgeLoaded();
return success * 1000 / totalTime;
}
}
@@ -0,0 +1,130 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.loader.metrics;

import java.util.ArrayList;
import java.util.List;

/**
* RangesTimer is a customized timer used to count the total time consumption
* of multiple time ranges of multiple threads, each TimeRange represents a
* time section, it means that a thread performs certain tasks during this time
* section, such as parsing or loading.
*
* <pre>
* TimeLine 1__2__3__4__5__6__7__8
*
* Thread-1 1_____3
* Thread-2 2_____4
* Thread-3 5_____7
* Thread-4 5__6
*
* occupancy |========| |=====|
* 3 + 2
* </pre>
*
* It's thread safe
*/
public class RangesTimer {

private final int capacity;
private final List<TimeRange> ranges;
private long lastEnd;
private long totalTime;

public RangesTimer(int capacity) {
this.capacity = capacity;
this.ranges = new ArrayList<>(capacity);
this.lastEnd = 0L;
this.totalTime = 0L;
}

public synchronized long totalTime() {
if (!this.ranges.isEmpty()) {
long time = this.caculate();
this.totalTime += time;
this.ranges.clear();
}
return this.totalTime;
}

public synchronized void addTimeRange(long start, long end) {
if (this.ranges.size() >= this.capacity) {
long time = this.caculate();
this.totalTime += time;
this.ranges.clear();
}
this.ranges.add(new TimeRange(start, end));
}

private long caculate() {
assert !this.ranges.isEmpty();
this.ranges.sort((o1, o2) -> (int) (o1.start() - o2.start()));
long time = 0L;
long start = this.lastEnd;
long end = this.lastEnd;
for (TimeRange range : this.ranges) {
if (range.start() <= end) {
/*
* There is overlap, merging range
*
* Thread-1 1_____3
* Thread-2 2_____4
* Thread-3 3_____5
*
* The 'end' is updated to 3->4->5, the range expand to [1, 5]
*/
end = Math.max(end, range.end());
} else {
/*
* There is no overlap, calculate the length of the old range
* then open up a new range
*
* Thread-4 6_____8
*/
time += (end - start);
start = range.start();
end = range.end();
}
this.lastEnd = Math.max(this.lastEnd, end);
}
time += (end - start);
return time;
}

private static class TimeRange {

private final long start;
private final long end;

public TimeRange(long start, long end) {
this.start = start;
this.end = end;
}

public long start() {
return this.start;
}

public long end() {
return this.end;
}
}
}
@@ -46,7 +46,7 @@ public BatchInsertTask(LoadContext context, InputStruct struct,
}

@Override
public void run() {
public void execute() {
int retryCount = 0;
do {
try {
@@ -83,6 +83,19 @@ public LoadMetrics metrics() {
return this.summary().metrics(this.struct);
}

@Override
public void run() {
long start = System.currentTimeMillis();
try {
this.execute();
} finally {
long end = System.currentTimeMillis();
this.context.summary().addTimeRange(this.type(), start, end);
}
}

public abstract void execute();

protected void plusLoadSuccess(int count) {
LoadMetrics metrics = this.summary().metrics(this.struct);
metrics.plusInsertSuccess(this.mapping, count);
@@ -46,7 +46,7 @@ public SingleInsertTask(LoadContext context, InputStruct struct,
}

@Override
public void run() {
public void execute() {
for (Record record : this.batch) {
try {
if (this.mapping.updateStrategies().isEmpty()) {
@@ -135,6 +135,7 @@ public void shutdown() {

public void submitBatch(InputStruct struct, ElementMapping mapping,
List<Record> batch) {
long start = System.currentTimeMillis();
try {
this.batchSemaphore.acquire();
} catch (InterruptedException e) {
@@ -147,13 +148,19 @@ public void submitBatch(InputStruct struct, ElementMapping mapping,
CompletableFuture.runAsync(task, this.batchService).exceptionally(e -> {
LOG.warn("Batch insert {} error, try single insert",
mapping.type(), e);
// The time of single insert is counted separately in this method
this.submitInSingle(struct, mapping, batch);
return null;
}).whenComplete((r, e) -> this.batchSemaphore.release());
}).whenComplete((r, e) -> {
this.batchSemaphore.release();
long end = System.currentTimeMillis();
this.context.summary().addTimeRange(mapping.type(), start, end);
});
}

private void submitInSingle(InputStruct struct, ElementMapping mapping,
List<Record> batch) {
long start = System.currentTimeMillis();
try {
this.singleSemaphore.acquire();
} catch (InterruptedException e) {
@@ -163,9 +170,11 @@ private void submitInSingle(InputStruct struct, ElementMapping mapping,

InsertTask task = new SingleInsertTask(this.context, struct,
mapping, batch);
CompletableFuture.runAsync(task, this.singleService)
.whenComplete((r, e) -> {
this.singleSemaphore.release();
});
CompletableFuture.runAsync(task, this.singleService).whenComplete(
(r, e) -> {
this.singleSemaphore.release();
long end = System.currentTimeMillis();
this.context.summary().addTimeRange(mapping.type(), start, end);
});
}
}
@@ -116,8 +116,10 @@ private static void printCountReport(LoadReport report) {
private static void printMeterReport(LoadSummary summary) {
printAndLog("meter metrics");
printAndLog("total time", TimeUtil.readableTime(summary.totalTime()));
printAndLog("vertex time", TimeUtil.readableTime(summary.vertexTime()));
printAndLog("vertex load rate(vertices/s)",
summary.loadRate(ElemType.VERTEX));
printAndLog("edge time", TimeUtil.readableTime(summary.edgeTime()));
printAndLog("edge load rate(edges/s)", summary.loadRate(ElemType.EDGE));
}

0 comments on commit 2dcbf15

Please sign in to comment.