Skip to content
Permalink
Browse files
feat: support use sql to construct graph for loader (#263)
  • Loading branch information
simon824 committed May 5, 2022
1 parent 4caf46d commit e0b34a7b36ded05b11dd972e67c8844194a78149
Showing 3 changed files with 38 additions and 9 deletions.
@@ -59,11 +59,13 @@ public JDBCSource source() {
public void init(LoadContext context, InputStruct struct)
throws InitException {
this.progress(context, struct);
try {
this.source.header(this.fetcher.readHeader());
this.fetcher.readPrimaryKey();
} catch (SQLException e) {
throw new InitException("Failed to fetch table structure info", e);
if (!this.source.existsCustomSQL()) {
try {
this.source.header(this.fetcher.readHeader());
this.fetcher.readPrimaryKey();
} catch (SQLException e) {
throw new InitException("Failed to fetch table structure info", e);
}
}
}

@@ -22,6 +22,7 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
@@ -95,6 +96,16 @@ public String[] readHeader() throws SQLException {
return this.columns;
}

private String[] readHeader(ResultSet rs) throws SQLException {
ResultSetMetaData metaData = rs.getMetaData();
List<String> columns = new ArrayList<>();
for (int i = 1; i <= metaData.getColumnCount(); i++) {
columns.add(metaData.getColumnName(i));
}
this.columns = columns.toArray(new String[]{});
return this.columns;
}

public void readPrimaryKey() throws SQLException {
String sql = this.source.vendor().buildGetPrimaryKeySql(this.source);
LOG.debug("The sql for reading primary keys is: {}", sql);
@@ -119,13 +130,18 @@ public List<Line> nextBatch() throws SQLException {
return null;
}

String select = this.source.vendor().buildSelectSql(this.source,
this.nextStartRow);
String select = this.source.existsCustomSQL() ?
this.source.customSQL() :
this.source.vendor().buildSelectSql(this.source, this.nextStartRow);

LOG.debug("The sql for select is: {}", select);

List<Line> batch = new ArrayList<>(this.source.batchSize() + 1);
try (Statement stmt = this.conn.createStatement();
ResultSet result = stmt.executeQuery(select)) {
if (this.source.existsCustomSQL()) {
this.readHeader(result);
}
while (result.next()) {
Object[] values = new Object[this.columns.length];
for (int i = 1, n = this.columns.length; i <= n; i++) {
@@ -144,7 +160,7 @@ public List<Line> nextBatch() throws SQLException {
throw e;
}

if (batch.size() != this.source.batchSize() + 1) {
if (this.source.existsCustomSQL() || batch.size() != this.source.batchSize() + 1) {
this.fullyFetched = true;
} else {
// Remove the last one
@@ -29,6 +29,8 @@
@JsonPropertyOrder({"type", "vendor"})
public class JDBCSource extends AbstractSource {

@JsonProperty("custom_sql")
private String customSQL;
@JsonProperty("vendor")
private JDBCVendor vendor;
@JsonProperty("driver")
@@ -59,16 +61,25 @@ public void check() throws IllegalArgumentException {
E.checkArgument(this.vendor != null, "The vendor can't be null");
E.checkArgument(this.url != null, "The url can't be null");
E.checkArgument(this.database != null, "The database can't be null");
E.checkArgument(this.table != null, "The table can't be null");
E.checkArgument(this.username != null, "The username can't be null");
E.checkArgument(this.password != null, "The password can't be null");
E.checkArgument(this.table != null || this.customSQL != null,
"At least one of table and sql can't be null");

this.schema = this.vendor.checkSchema(this);
if (this.driver == null) {
this.driver = this.vendor.defaultDriver();
}
}

public String customSQL() {
return this.customSQL;
}

public boolean existsCustomSQL() {
return this.customSQL != null;
}

public JDBCVendor vendor() {
return this.vendor;
}

0 comments on commit e0b34a7

Please sign in to comment.