Skip to content
Permalink
Browse files
Improve some code (#96)
Change-Id: Ic7addbf75b324c931aeeca7f244e993547d136d1
  • Loading branch information
Linary authored and javeme committed Sep 3, 2019
1 parent ae1a601 commit 9a286fdee9213a1b38e9d2a23160493cdd60cf73
Showing 20 changed files with 54 additions and 46 deletions.
@@ -46,7 +46,7 @@
import com.baidu.hugegraph.loader.summary.LoadMetrics;
import com.baidu.hugegraph.loader.summary.LoadSummary;
import com.baidu.hugegraph.loader.task.TaskManager;
import com.baidu.hugegraph.loader.util.HugeClientWrapper;
import com.baidu.hugegraph.loader.util.HugeClientHolder;
import com.baidu.hugegraph.loader.util.LoadUtil;
import com.baidu.hugegraph.loader.util.Printer;
import com.baidu.hugegraph.structure.GraphElement;
@@ -111,7 +111,7 @@ private void createSchema() {
return;
}
File schemaFile = FileUtils.getFile(options.schema);
HugeClient client = HugeClientWrapper.get(options);
HugeClient client = HugeClientHolder.get(options);
GroovyExecutor groovyExecutor = new GroovyExecutor();
groovyExecutor.bind("schema", client.schema());
String script;
@@ -232,7 +232,7 @@ private void stopLoading(int code) {
LOG.info("Stop loading");
// Shutdown task manager
this.taskManager.shutdown();
HugeClientWrapper.close();
HugeClientHolder.close();
// Exit JVM if the code is not EXIT_CODE_NORM
if (Constants.EXIT_CODE_NORM != code) {
LoadUtil.exit(code);
@@ -24,7 +24,7 @@

import com.baidu.hugegraph.driver.HugeClient;
import com.baidu.hugegraph.loader.executor.LoadContext;
import com.baidu.hugegraph.loader.util.HugeClientWrapper;
import com.baidu.hugegraph.loader.util.HugeClientHolder;
import com.baidu.hugegraph.structure.schema.EdgeLabel;
import com.baidu.hugegraph.structure.schema.PropertyKey;
import com.baidu.hugegraph.structure.schema.VertexLabel;
@@ -37,7 +37,7 @@ public final class SchemaCache {
private final Map<String, EdgeLabel> edgeLabels;

public SchemaCache(LoadContext context) {
this.client = HugeClientWrapper.get(context.options());
this.client = HugeClientHolder.get(context.options());
this.propertyKeys = new HashMap<>();
this.vertexLabels = new HashMap<>();
this.edgeLabels = new HashMap<>();
@@ -30,16 +30,19 @@ public final class Constants {

public static final Charset CHARSET = Charsets.UTF_8;

public static final String EMPTY = "";
public static final String CSV_DELIMITER = ",";
public static final String TEXT_DELIMITER = "\t";
public static final String EMPTY_STR = "";
public static final String DOT_STR = ".";
public static final String MINUS_STR = "-";
public static final String COMMA_STR = ",";
public static final String TAB_STR = "\t";
public static final String NULL_STR = "NULL";
public static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
public static final String SKIPPED_LINE_REGEX = "";

public static final long BATCH_PRINT_FREQ = 10_000_000L;
public static final long SINGLE_PRINT_FREQ = 10_000L;
public static final String BATCH_WORKER = "batch-worker-%d";
public static final String SINGLE_WORKER = "single-worker-%d";
public static final long BATCH_PRINT_FREQ = 10_000_000L;
public static final long SINGLE_PRINT_FREQ = 10_000L;

public static final int VERTEX_ID_LIMIT = 128;
public static final String[] SEARCH_LIST = new String[]{":", "!"};
@@ -32,7 +32,7 @@ public class CsvLineParser extends TextLineParser {
private final CSVParser parser;

public CsvLineParser(FileSource source) {
super(source, Constants.CSV_DELIMITER);
super(source, Constants.COMMA_STR);
char separator = this.delimiter().charAt(0);
this.parser = new CSVParserBuilder().withSeparator(separator)
.withIgnoreQuotations(false)
@@ -38,7 +38,7 @@ public class TextLineParser implements LineParser {
public TextLineParser(FileSource source) {
this(source, source.delimiter() != null ?
source.delimiter() :
Constants.TEXT_DELIMITER);
Constants.TAB_STR);
}

public TextLineParser(FileSource source, String delimiter) {
@@ -77,7 +77,7 @@ public Line parse(String line) {
String[] supColumns = new String[this.header.length];
System.arraycopy(columns, 0, supColumns, 0, columns.length);
Arrays.fill(supColumns, columns.length, supColumns.length,
Constants.EMPTY);
Constants.EMPTY_STR);
return new Line(line, this.header, supColumns);
}
return new Line(line, this.header, columns);
@@ -123,7 +123,7 @@ public String[] split(String line) {
private boolean tailColumnEmpty(String[] columns, int count) {
for (int i = 0; i < count; i++) {
int tailIdx = columns.length - 1 - i;
if (!columns[tailIdx].equals(Constants.EMPTY)) {
if (!columns[tailIdx].equals(Constants.EMPTY_STR)) {
return false;
}
}
@@ -99,8 +99,8 @@ public static LoadProgress read(String structFileName) throws IOException {
}

private static String getProgressFileName(String structFileName) {
int lastDotIdx = structFileName.lastIndexOf(".");
int lastDotIdx = structFileName.lastIndexOf(Constants.DOT_STR);
String prefix = structFileName.substring(0, lastDotIdx);
return prefix + "-" + SERIALIZE_FILE;
return prefix + Constants.MINUS_STR + SERIALIZE_FILE;
}
}
@@ -92,6 +92,6 @@ public void retainAll(String[] names) {

@Override
public String toString() {
return this.rawLine();
return this.rawLine;
}
}
@@ -69,7 +69,7 @@ public void init(LoadContext context, ElementStruct struct) {
this.readers = this.openReaders();
} catch (IOException e) {
throw new LoadException("Failed to open readers for struct '%s'",
struct);
e, struct);
}
this.progress(context, struct);

@@ -122,7 +122,7 @@ public String skipOffset(boolean needHeader) {
} catch (IOException e) {
throw new LoadException("Failed to skip the first %s lines " +
"of file %s, please ensure the file " +
"must have at least %s lines", offset,
"must have at least %s lines", e, offset,
this.readables.get(this.index), offset);
}
this.newProgress.addLoadingOffset(offset);
@@ -180,7 +180,7 @@ private BufferedReader openReader(Readable readable) {
}
}
throw new LoadException("Failed to create reader for '%s'",
readable);
e, readable);
}
}

@@ -134,7 +134,7 @@ private static void checkExist(FileSystem fs, Path path) {
}
} catch (IOException e) {
throw new LoadException("An exception occurred while checking " +
"HDFS path: '%s'", path);
"HDFS path: '%s'", e, path);
}
}

@@ -27,9 +27,11 @@
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;

import com.baidu.hugegraph.loader.constant.Constants;
import com.baidu.hugegraph.loader.exception.LoadException;
import com.baidu.hugegraph.loader.reader.Line;
import com.baidu.hugegraph.loader.source.jdbc.JDBCSource;
@@ -87,7 +89,7 @@ public void readHeader() throws SQLException {
this.close();
throw e;
}
E.checkArgument(this.columns != null && this.columns.length != 0,
E.checkArgument(ArrayUtils.isNotEmpty(this.columns),
"The colmuns of the table '%s' shouldn't be empty",
this.source.table());
}
@@ -106,7 +108,7 @@ public void readPrimaryKey() throws SQLException {
this.close();
throw e;
}
E.checkArgument(this.primaryKeys != null && this.primaryKeys.length != 0,
E.checkArgument(ArrayUtils.isNotEmpty(this.primaryKeys),
"The primary keys of the table '%s' shouldn't be empty",
this.source.table());
}
@@ -126,11 +128,11 @@ public List<Line> nextBatch() throws SQLException {
for (int i = 1, n = this.columns.length; i <= n; i++) {
Object value = result.getObject(i);
if (value == null) {
value = "NULL";
value = Constants.NULL_STR;
}
values[i - 1] = value;
}
String rawLine = StringUtils.join(values, ",");
String rawLine = StringUtils.join(values, Constants.COMMA_STR);
Line line = new Line(rawLine, this.columns, values);
batch.add(line);
}
@@ -67,9 +67,9 @@ public SourceType type() {
public void check() throws IllegalArgumentException {
if (this.format == FileFormat.CSV) {
E.checkArgument(this.delimiter == null ||
this.delimiter.equals(Constants.CSV_DELIMITER),
this.delimiter.equals(Constants.COMMA_STR),
"The delimiter must be '%s' when file format " +
"is %s, but got '%s'", Constants.CSV_DELIMITER,
"is %s, but got '%s'", Constants.COMMA_STR,
this.format, this.delimiter);
}
String elemDelimiter = this.listFormat().elemDelimiter();
@@ -43,7 +43,7 @@ public String regex() {

private Matcher matcher() {
if (this.matcher == null) {
this.matcher = Pattern.compile(this.regex).matcher(Constants.EMPTY);
this.matcher = Pattern.compile(this.regex).matcher(Constants.EMPTY_STR);
}
return this.matcher;
}
@@ -21,6 +21,7 @@

import org.apache.http.client.utils.URIBuilder;

import com.baidu.hugegraph.loader.constant.Constants;
import com.baidu.hugegraph.loader.reader.Line;
import com.baidu.hugegraph.loader.reader.jdbc.JDBCUtil;
import com.baidu.hugegraph.util.E;
@@ -148,7 +149,6 @@ public String buildUrl(JDBCSource source) {
}

/**
* TODO: Not related to schema?
* NOTE: don't add an semicolon(;) at the end of oracle sql
*/
@Override
@@ -294,6 +294,7 @@ public String buildUrl(JDBCSource source) {
URIBuilder uriBuilder = new URIBuilder();
uriBuilder.setPath(url)
.setParameter("useSSL", "false")
.setParameter("characterEncoding", Constants.CHARSET.name())
.setParameter("rewriteBatchedStatements", "true")
.setParameter("useServerPrepStmts", "false")
.setParameter("autoReconnect", "true");
@@ -59,7 +59,7 @@ public ElementStruct() {
this.mappingValues = new HashMap<>();
this.selectedFields = new HashSet<>();
this.ignoredFields = new HashSet<>();
this.nullValues = ImmutableSet.of(Constants.EMPTY);
this.nullValues = ImmutableSet.of(Constants.EMPTY_STR);
this.uniqueKey = null;
}

@@ -69,7 +69,7 @@ public ElementStruct() {
public String uniqueKey() {
if (this.uniqueKey == null) {
String code = HashUtil.hash(JsonUtil.toJson(this));
this.uniqueKey = this.label + "-" + code;
this.uniqueKey = this.label + Constants.MINUS_STR + code;
}
return this.uniqueKey;
}
@@ -32,7 +32,7 @@
import com.baidu.hugegraph.loader.executor.LoadOptions;
import com.baidu.hugegraph.loader.struct.ElementStruct;
import com.baidu.hugegraph.loader.summary.LoadMetrics;
import com.baidu.hugegraph.loader.util.HugeClientWrapper;
import com.baidu.hugegraph.loader.util.HugeClientHolder;
import com.baidu.hugegraph.loader.util.Printer;
import com.baidu.hugegraph.rest.ClientException;
import com.baidu.hugegraph.structure.GraphElement;
@@ -59,8 +59,10 @@ public void run() {
this.addBatch(type, this.batch(), options.checkVertex);
break;
} catch (ClientException e) {
LOG.debug("client exception: {}", e.getMessage());
retryCount = this.waitThenRetry(retryCount, e);
} catch (ServerException e) {
LOG.debug("server exception: {}", e.getMessage());
if (UNACCEPTABLE_EXCEPTIONS.contains(e.exception())) {
throw e;
}
@@ -77,7 +79,7 @@ public void run() {

@SuppressWarnings("unchecked")
private void addBatch(ElemType type, List<GE> elements, boolean check) {
HugeClient client = HugeClientWrapper.get(this.context().options());
HugeClient client = HugeClientHolder.get(this.context().options());
if (type.isVertex()) {
client.graph().addVertices((List<Vertex>) elements);
} else {
@@ -34,7 +34,7 @@
import com.baidu.hugegraph.loader.executor.LoadOptions;
import com.baidu.hugegraph.loader.struct.ElementStruct;
import com.baidu.hugegraph.loader.summary.LoadMetrics;
import com.baidu.hugegraph.loader.util.HugeClientWrapper;
import com.baidu.hugegraph.loader.util.HugeClientHolder;
import com.baidu.hugegraph.loader.util.LoadUtil;
import com.baidu.hugegraph.loader.util.Printer;
import com.baidu.hugegraph.structure.GraphElement;
@@ -60,7 +60,7 @@ public void run() {
LoadMetrics metrics = this.context().summary().metrics(this.struct());
for (GE element : this.batch()) {
try {
addSingle(type, element);
this.addSingle(type, element);
metrics.increaseLoadSuccess();
} catch (Exception e) {
metrics.increaseLoadFailure();
@@ -82,7 +82,7 @@ public void run() {
}

private void addSingle(ElemType type, GE element) {
HugeClient client = HugeClientWrapper.get(this.context().options());
HugeClient client = HugeClientHolder.get(this.context().options());
if (type.isVertex()) {
client.graph().addVertex((Vertex) element);
} else {
@@ -258,13 +258,11 @@ private static boolean checkDataType(Object value, DataType dataType) {
*/
private static boolean checkCollectionDataType(Collection<?> values,
DataType dataType) {
boolean valid = true;
for (Object value : values) {
if (!checkDataType(value, dataType)) {
valid = false;
break;
return false;
}
}
return valid;
return true;
}
}
@@ -22,13 +22,13 @@
import com.baidu.hugegraph.driver.HugeClient;
import com.baidu.hugegraph.loader.executor.LoadOptions;

public final class HugeClientWrapper {
public final class HugeClientHolder {

private static volatile HugeClient instance;

public static HugeClient get(LoadOptions options) {
if (instance == null) {
synchronized(HugeClientWrapper.class) {
synchronized(HugeClientHolder.class) {
if (instance == null) {
instance = newHugeClient(options);
}
@@ -37,7 +37,7 @@ public static HugeClient get(LoadOptions options) {
return instance;
}

private HugeClientWrapper() {}
private HugeClientHolder() {}

private static HugeClient newHugeClient(LoadOptions options) {
String address = options.host + ":" + options.port;
@@ -55,7 +55,7 @@ private static HugeClient newHugeClient(LoadOptions options) {
}

public static void close() {
synchronized(HugeClientWrapper.class) {
synchronized(HugeClientHolder.class) {
if (instance != null) {
instance.close();
instance = null;
@@ -48,7 +48,8 @@ public void connect() {
this.conn = DriverManager.getConnection(this.url, this.user,
this.pass);
} catch (ClassNotFoundException e) {
throw new LoadException("Invalid driver class '%s'", this.driver);
throw new LoadException("Invalid driver class '%s'",
e, this.driver);
} catch (SQLException e) {
throw new LoadException("Failed to connect database via '%s'",
e, this.url);
@@ -62,7 +63,8 @@ public void connect(String database) {
Class.forName(this.driver);
this.conn = DriverManager.getConnection(url, this.user, this.pass);
} catch (ClassNotFoundException e) {
throw new LoadException("Invalid driver class '%s'", this.driver);
throw new LoadException("Invalid driver class '%s'",
e, this.driver);
} catch (SQLException e) {
throw new LoadException("Failed to connect database via '%s'",
e, this.url);

0 comments on commit 9a286fd

Please sign in to comment.