Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public InputFormatProvider getInputFormatProvider(ConnectorContext context, Samp
DBConfiguration.configureDB(connectionConfigAccessor.getConfiguration(), driverClass.getName(),
getConnectionString(path.getDatabase()), config.getUser(), config.getPassword());
}
String tableQuery = getTableQuery(path, request.getLimit());
String tableQuery = getTableQuery(path.getDatabase(), path.getSchema(), path.getTable(), request.getLimit());
DataDrivenETLDBInputFormat.setInput(connectionConfigAccessor.getConfiguration(), getDBRecordType(),
tableQuery, null, false);
connectionConfigAccessor.setConnectionArguments(Maps.fromProperties(config.getConnectionArgumentsProperties()));
Expand Down Expand Up @@ -116,16 +116,16 @@ protected String getConnectionString(String database) {
return config.getConnectionString();
}

protected String getTableQuery(DBConnectorPath path) {
return path.getSchema() == null ? String.format("SELECT * FROM \"%s\".\"%s\"", path.getDatabase(), path.getTable())
: String.format("SELECT * FROM \"%s\".\"%s\".\"%s\"", path.getDatabase(), path.getSchema(), path.getTable());
protected String getTableQuery(String database, String schema, String table) {
return schema == null ? String.format("SELECT * FROM \"%s\".\"%s\"", database, table)
: String.format("SELECT * FROM \"%s\".\"%s\".\"%s\"", database, schema, table);
}

protected String getTableQuery(DBConnectorPath path, int limit) {
return path.getSchema() == null ?
String.format("SELECT * FROM \"%s\".\"%s\" LIMIT %d", path.getDatabase(), path.getTable(), limit) :
protected String getTableQuery(String database, String schema, String table, int limit) {
return schema == null ?
String.format("SELECT * FROM \"%s\".\"%s\" LIMIT %d", database, table, limit) :
String.format(
"SELECT * FROM \"%s\".\"%s\".\"%s\" LIMIT %d", path.getDatabase(), path.getSchema(), path.getTable(), limit);
"SELECT * FROM \"%s\".\"%s\".\"%s\" LIMIT %d", database, schema, table, limit);
}

protected Schema loadTableSchema(Connection connection, String query) throws SQLException {
Expand All @@ -144,4 +144,11 @@ protected void setConnectionProperties(Map<String, String> properties) {
properties.put(ConnectionConfig.PASSWORD, rawProperties.get(ConnectionConfig.PASSWORD));
properties.put(ConnectionConfig.CONNECTION_ARGUMENTS, rawProperties.get(ConnectionConfig.CONNECTION_ARGUMENTS));
}

@Override
protected Schema getTableSchema(Connection connection, String database,
String schema, String table) throws SQLException {

return loadTableSchema(getConnection(), getTableQuery(database, schema, table));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of changing definition of method getTableQuery(), can we do something like this?
return loadTableSchema(getConnection(), getTableQuery(new DBSpecificPath(supportSchema(), database, schema, table));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to do this to de-couple the db path and table query, mainly they are independent logically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
return;
}

properties.put(SqlServerSource.SqlServerSourceConfig.IMPORT_QUERY, getTableQuery(path));
properties.put(SqlServerSource.SqlServerSourceConfig.IMPORT_QUERY, getTableQuery(path.getDatabase(),
path.getSchema(),
path.getTable()));
properties.put(SqlServerSource.SqlServerSourceConfig.NUM_SPLITS, "1");
properties.put(SqlServerSource.SqlServerSourceConfig.DATABASE, path.getDatabase());
properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
Expand All @@ -92,9 +94,9 @@ public StructuredRecord transform(LongWritable longWritable, SqlServerSourceDBRe
}

@Override
protected String getTableQuery(DBConnectorPath path, int limit) {
protected String getTableQuery(String database, String schema, String table, int limit) {
return String.format(
"SELECT TOP(%d) * FROM \"%s\".\"%s\".\"%s\"", limit, path.getDatabase(), path.getSchema(), path.getTable());
"SELECT TOP(%d) * FROM \"%s\".\"%s\".\"%s\"", limit, database, schema, table);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,21 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
return;
}

properties.put(MysqlSource.MysqlSourceConfig.IMPORT_QUERY, getTableQuery(path));
properties.put(MysqlSource.MysqlSourceConfig.IMPORT_QUERY, getTableQuery(path.getDatabase(), path.getSchema(),
path.getTable()));
properties.put(MysqlSource.MysqlSourceConfig.NUM_SPLITS, "1");
properties.put(MysqlSource.MysqlSourceConfig.DATABASE, path.getDatabase());
properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
}

@Override
protected String getTableQuery(DBConnectorPath path) {
return String.format("SELECT * FROM `%s`.`%s`", path.getDatabase(), path.getTable());
protected String getTableQuery(String database, String schema, String table) {
return String.format("SELECT * FROM `%s`.`%s`", database, table);
}

@Override
protected String getTableQuery(DBConnectorPath path, int limit) {
return String.format("SELECT * FROM `%s`.`%s` LIMIT %d", path.getDatabase(), path.getTable(), limit);
protected String getTableQuery(String database, String schema, String table, int limit) {
return String.format("SELECT * FROM `%s`.`%s` LIMIT %d", database, table, limit);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
return;
}

properties.put(OracleSource.OracleSourceConfig.IMPORT_QUERY, getTableQuery(path));
properties.put(OracleSource.OracleSourceConfig.IMPORT_QUERY, getTableQuery(path.getDatabase(), path.getSchema(),
path.getTable()));
properties.put(OracleSource.OracleSourceConfig.NUM_SPLITS, "1");
properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
}
Expand Down Expand Up @@ -123,13 +124,13 @@ protected String getConnectionString(@Nullable String database) {
}

@Override
protected String getTableQuery(DBConnectorPath path) {
return String.format("SELECT * from \"%s\".\"%s\"", path.getSchema(), path.getTable());
protected String getTableQuery(String database, String schema, String table) {
return String.format("SELECT * from \"%s\".\"%s\"", schema, table);
}

@Override
protected String getTableQuery(DBConnectorPath path, int limit) {
return String.format("SELECT * FROM \"%s\".\"%s\" WHERE ROWNUM <= %d", path.getSchema(), path.getTable(), limit);
protected String getTableQuery(String database, String schema, String table, int limit) {
return String.format("SELECT * FROM \"%s\".\"%s\" WHERE ROWNUM <= %d", schema, table, limit);
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- version properties -->
<cdap.version>6.5.0</cdap.version>
<cdap.plugin.version>2.7.0</cdap.plugin.version>
<cdap.plugin.version>2.7.2-SNAPSHOT</cdap.plugin.version>
<guava.version>13.0.1</guava.version>
<hadoop.version>2.3.0</hadoop.version>
<hsql.version>2.2.4</hsql.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
return;
}

properties.put(PostgresSource.PostgresSourceConfig.IMPORT_QUERY, getTableQuery(path));
properties.put(PostgresSource.PostgresSourceConfig.IMPORT_QUERY, getTableQuery(path.getDatabase(),
path.getSchema(), path.getTable()));
properties.put(PostgresSource.PostgresSourceConfig.NUM_SPLITS, "1");
properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));

Expand All @@ -104,13 +105,14 @@ public StructuredRecord transform(LongWritable longWritable, PostgresDBRecord re
return record.getRecord();
}

protected String getTableQuery(DBConnectorPath path) {
return String.format("SELECT * FROM \"%s\".\"%s\"", path.getSchema(), path.getTable());
@Override
protected String getTableQuery(String database, String schema, String table) {
return String.format("SELECT * FROM \"%s\".\"%s\"", schema, table);
}

@Override
protected String getTableQuery(DBConnectorPath path, int limit) {
return String.format("SELECT * FROM \"%s\".\"%s\" LIMIT %d", path.getSchema(), path.getTable(), limit);
protected String getTableQuery(String database, String schema, String table, int limit) {
return String.format("SELECT * FROM \"%s\".\"%s\" LIMIT %d", schema, table, limit);
}

}