Skip to content

Commit

Permalink
Support read header in different releation database (#79)
Browse files Browse the repository at this point in the history
Implement #77
Change-Id: I83bb8ea8652463ceeba520ae3f4a76d169ecae83
  • Loading branch information
Linary authored and javeme committed Aug 9, 2019
1 parent a9f1d01 commit 8698380
Show file tree
Hide file tree
Showing 13 changed files with 542 additions and 314 deletions.
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ branches:
- /^release-.*$/
- /^test-.*$/

# Fix can't download oracle jdbc driver from maven respority
before_install: mvn install:install-file -Dfile=$STATIC_DIR/lib/ojdbc8-12.2.0.1.jar -DgroupId=com.oracle -DartifactId=ojdbc8 -Dversion=12.2.0.1 -Dpackaging=jar

install: mvn compile -Dmaven.javadoc.skip=true | grep -v "Downloading\|Downloaded"

before_script:
Expand Down Expand Up @@ -43,3 +46,4 @@ env:
- SOURCE_TYPE=jdbc
global:
- TRAVIS_DIR=assembly/travis
- STATIC_DIR=assembly/static
Binary file added assembly/static/lib/ojdbc8-12.2.0.1.jar
Binary file not shown.
15 changes: 15 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,21 @@
<artifactId>mysql-connector-java</artifactId>
<version>6.0.6</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.5</version>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>7.2.0.jre8</version>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc8</artifactId>
<version>12.2.0.1</version>
</dependency>
</dependencies>

<profiles>
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/baidu/hugegraph/loader/reader/Line.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package com.baidu.hugegraph.loader.reader;

import java.util.Arrays;
import java.util.Map;

import com.baidu.hugegraph.util.InsertionOrderUtil;
Expand Down Expand Up @@ -76,13 +77,19 @@ public Map<String, Object> toMap() {
if (this.keyValues != null) {
return this.keyValues;
}
String[] names = this.names();
Object[] values = this.values();
this.keyValues = InsertionOrderUtil.newMap();
for (int i = 0, n = names.length; i < n; i++) {
this.keyValues.put(names[i], values[i]);
}
return this.keyValues;
}

public void retainAll(String[] names) {
this.toMap().keySet().retainAll(Arrays.asList(names));
}

@Override
public String toString() {
return this.rawLine();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ public JDBCSource source() {
public void init(LoadContext context, ElementStruct struct) {
try {
this.fetcher.readHeader();
this.fetcher.readPrimaryKey();
} catch (SQLException e) {
throw new LoadException("Failed to read column names", e);
throw new LoadException("Failed to fetch table structure info", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@

package com.baidu.hugegraph.loader.reader.jdbc;

public final class MysqlUtil {
import java.sql.SQLException;

public static String escapeString(String value) {
import org.postgresql.core.Utils;

import com.baidu.hugegraph.loader.exception.LoadException;

public final class JDBCUtil {

public static String escapeMysql(String value) {
int length = value.length();
if (!isEscapeNeededForString(value, length)) {
StringBuilder buf = new StringBuilder(length + 2);
buf.append('\'').append(value).append('\'');
return buf.toString();
return '\'' + value + '\'';
}

StringBuilder buf = new StringBuilder((int) (length * 1.1D));
Expand Down Expand Up @@ -76,9 +80,30 @@ public static String escapeString(String value) {
return buf.toString();
}

public static boolean isEscapeNeededForString(String sql, int length) {
boolean needsEscape = false;
public static String escapePostgresql(String value) {
StringBuilder builder = new StringBuilder(8 + value.length());
builder.append('\'');
try {
Utils.escapeLiteral(builder, value, false);
} catch (SQLException e) {
throw new LoadException("Failed to escape '%s'", e, value);
}
builder.append('\'');
return builder.toString();
}

public static String escapeOracle(String value) {
// TODO: check it
return escapeMysql(value);
}

public static String escapeSqlserver(String value) {
// TODO: check it
return escapeMysql(value);
}

private static boolean isEscapeNeededForString(String sql, int length) {
boolean needsEscape = false;
for (int i = 0; i < length; ++i) {
char c = sql.charAt(i);
switch (c) {
Expand Down
111 changes: 46 additions & 65 deletions src/main/java/com/baidu/hugegraph/loader/reader/jdbc/RowFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.List;

import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;

import com.baidu.hugegraph.loader.exception.LoadException;
Expand All @@ -41,49 +40,30 @@ public class RowFetcher {

private static final Logger LOG = Log.logger(RowFetcher.class);

private final String database;
private final String table;

private final JDBCSource source;
private final Connection conn;

private final int batchSize;
private String[] columns;
private Line nextBatchStartRow;
private boolean finished;
private String[] primaryKeys;
private Line nextStartRow;
private boolean fullyFetched;

public RowFetcher(JDBCSource source) throws SQLException {
this.database = source.database();
this.table = source.table();
this.batchSize = source.batchSize();
this.conn = this.connect(source);
this.source = source;
this.conn = this.connect();
this.columns = null;
this.finished = false;
this.primaryKeys = null;
this.nextStartRow = null;
this.fullyFetched = false;
}

private Connection connect(JDBCSource source) throws SQLException {
String url = source.url();
String database = source.database();
if (url.endsWith("/")) {
url = String.format("%s%s", url, database);
} else {
url = String.format("%s/%s", url, database);
}
private Connection connect() throws SQLException {
String url = this.source.vendor().buildUrl(this.source);
LOG.info("Connect to database {}", url);

int maxTimes = source.reconnectMaxTimes();
int interval = source.reconnectInterval();

URIBuilder uriBuilder = new URIBuilder();
uriBuilder.setPath(url)
.setParameter("characterEncoding", "utf-8")
.setParameter("rewriteBatchedStatements", "true")
.setParameter("useServerPrepStmts", "false")
.setParameter("autoReconnect", "true")
.setParameter("maxReconnects", String.valueOf(maxTimes))
.setParameter("initialTimeout", String.valueOf(interval));

String driverName = source.driver();
String username = source.username();
String password = source.password();
String driverName = this.source.driver();
String username = this.source.username();
String password = this.source.password();
try {
// Register JDBC driver
Class.forName(driverName);
Expand All @@ -94,11 +74,8 @@ private Connection connect(JDBCSource source) throws SQLException {
}

public void readHeader() throws SQLException {
String sql = String.format("SELECT COLUMN_NAME " +
"FROM INFORMATION_SCHEMA.COLUMNS " +
"WHERE TABLE_NAME = '%s' " +
"AND TABLE_SCHEMA = '%s';",
this.table, this.database);
String sql = this.source.vendor().buildGetHeaderSql(this.source);
LOG.debug("The sql for reading headers is: {}", sql);
try (Statement stmt = this.conn.createStatement();
ResultSet result = stmt.executeQuery(sql)) {
List<String> columns = new ArrayList<>();
Expand All @@ -112,17 +89,36 @@ public void readHeader() throws SQLException {
}
E.checkArgument(this.columns != null && this.columns.length != 0,
"The colmuns of the table '%s' shouldn't be empty",
this.table);
this.source.table());
}

public void readPrimaryKey() throws SQLException {
String sql = this.source.vendor().buildGetPrimaryKeySql(this.source);
LOG.debug("The sql for reading primary keys is: {}", sql);
try (Statement stmt = this.conn.createStatement();
ResultSet result = stmt.executeQuery(sql)) {
List<String> columns = new ArrayList<>();
while (result.next()) {
columns.add(result.getString("COLUMN_NAME"));
}
this.primaryKeys = columns.toArray(new String[]{});
} catch (SQLException e) {
this.close();
throw e;
}
E.checkArgument(this.primaryKeys != null && this.primaryKeys.length != 0,
"The primary keys of the table '%s' shouldn't be empty",
this.source.table());
}

public List<Line> nextBatch() throws SQLException {
if (this.finished) {
if (this.fullyFetched) {
return null;
}

String select = this.buildSql();

List<Line> batch = new ArrayList<>(this.batchSize + 1);
String select = this.source.vendor().buildSelectSql(this.source,
this.nextStartRow);
List<Line> batch = new ArrayList<>(this.source.batchSize() + 1);
try (Statement stmt = this.conn.createStatement();
ResultSet result = stmt.executeQuery(select)) {
while (result.next()) {
Expand All @@ -143,32 +139,17 @@ public List<Line> nextBatch() throws SQLException {
throw e;
}

if (batch.size() != this.batchSize + 1) {
this.finished = true;
if (batch.size() != this.source.batchSize() + 1) {
this.fullyFetched = true;
} else {
// Remove the last one
this.nextBatchStartRow = batch.remove(batch.size() - 1);
Line lastLine = batch.remove(batch.size() - 1);
lastLine.retainAll(this.primaryKeys);
this.nextStartRow = lastLine;
}
return batch;
}

public String buildSql() {
int limit = this.batchSize + 1;

StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("SELECT * FROM ").append(this.table);

if (this.nextBatchStartRow != null) {
WhereBuilder where = new WhereBuilder(true);
where.gte(this.nextBatchStartRow.names(),
this.nextBatchStartRow.values());
sqlBuilder.append(where.build());
}
sqlBuilder.append(" LIMIT ").append(limit);
sqlBuilder.append(";");
return sqlBuilder.toString();
}

public void close() {
try {
this.conn.close();
Expand Down
Loading

0 comments on commit 8698380

Please sign in to comment.