Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
path.getSchema(),
path.getTable()));
properties.put(CloudSQLMySQLSource.CloudSQLMySQLSourceConfig.NUM_SPLITS, "1");
properties.put(CloudSQLMySQLSource.CloudSQLMySQLSourceConfig.FETCH_SIZE,
CloudSQLMySQLSource.CloudSQLMySQLSourceConfig.DEFAULT_FETCH_SIZE);
properties.put(ConnectionConfig.DATABASE, path.getDatabase());
properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
properties.put(CloudSQLMySQLSink.CloudSQLMySQLSinkConfig.TABLE_NAME, table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.cdap.plugin.common.ReferenceNames;
import io.cdap.plugin.common.db.DBConnectorPath;
import io.cdap.plugin.common.db.DBPath;
import io.cdap.plugin.db.ConnectionConfig;
import io.cdap.plugin.db.SchemaReader;
import io.cdap.plugin.db.connector.AbstractDBSpecificConnector;
import io.cdap.plugin.postgres.PostgresDBRecord;
Expand Down Expand Up @@ -112,6 +111,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
sinkProperties.put(CloudSQLPostgreSQLSink.CloudSQLPostgreSQLSinkConfig.DB_SCHEMA_NAME, schema);
}
sourceProperties.put(CloudSQLPostgreSQLSource.CloudSQLPostgreSQLSourceConfig.NUM_SPLITS, "1");
sourceProperties.put(CloudSQLPostgreSQLSource.CloudSQLPostgreSQLSourceConfig.FETCH_SIZE,
CloudSQLPostgreSQLSource.CloudSQLPostgreSQLSourceConfig.DEFAULT_FETCH_SIZE);
String table = path.getTable();
if (table == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ protected Map<String, String> getDBSpecificArguments() {
return Collections.emptyMap();
}

@Override
public Integer getFetchSize() {
Integer fetchSize = super.getFetchSize();
return fetchSize == null ? Integer.parseInt(DEFAULT_FETCH_SIZE) : fetchSize;
}

@Override
protected CloudSQLPostgreSQLConnectorConfig getConnection() {
return connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public abstract class AbstractDBSpecificSourceConfig extends PluginConfig implem
public static final String SCHEMA = "schema";
public static final String DATABASE = "database";
public static final String FETCH_SIZE = "fetchSize";
public static final String DEFAULT_FETCH_SIZE = "1000";

@Name(Constants.Reference.REFERENCE_NAME)
@Description(Constants.Reference.REFERENCE_NAME_DESCRIPTION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
sinkProperties.put(SqlServerSink.SqlServerSinkConfig.DB_SCHEMA_NAME, schema);
}
sourceProperties.put(SqlServerSource.SqlServerSourceConfig.NUM_SPLITS, "1");
sourceProperties.put(SqlServerSource.SqlServerSourceConfig.FETCH_SIZE,
SqlServerSource.SqlServerSourceConfig.DEFAULT_FETCH_SIZE);
String table = path.getTable();
if (table == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
properties.put(MysqlSource.MysqlSourceConfig.IMPORT_QUERY, getTableQuery(path.getDatabase(), path.getSchema(),
path.getTable()));
properties.put(MysqlSource.MysqlSourceConfig.NUM_SPLITS, "1");
properties.put(MysqlSource.MysqlSourceConfig.FETCH_SIZE, MysqlSource.MysqlSourceConfig.DEFAULT_FETCH_SIZE);
properties.put(MysqlSource.MysqlSourceConfig.DATABASE, path.getDatabase());
properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
properties.put(MysqlSink.MysqlSinkConfig.TABLE_NAME, table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
sinkProperties.put(OracleSink.OracleSinkConfig.DB_SCHEMA_NAME, schema);
}
sourceProperties.put(OracleSource.OracleSourceConfig.NUM_SPLITS, "1");
sourceProperties.put(OracleSource.OracleSourceConfig.FETCH_SIZE,
OracleSource.OracleSourceConfig.DEFAULT_FETCH_SIZE);
String table = path.getTable();
if (table == null) {
return;
Expand Down
2 changes: 1 addition & 1 deletion postgresql-plugin/docs/Postgres-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ back from the query. However, it must match the schema that comes back from the
except it can mark fields as nullable and can contain a subset of the fields.

**Fetch Size:** The number of rows to fetch at a time per split. Larger fetch size can result in faster import,
with the tradeoff of higher memory usage.
with the tradeoff of higher memory usage. If not specified, the default value is 1000.

Example
------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
sinkProperties.put(PostgresSink.PostgresSinkConfig.DB_SCHEMA_NAME, schema);
}
sourceProperties.put(PostgresSource.PostgresSourceConfig.NUM_SPLITS, "1");
sourceProperties.put(PostgresSource.PostgresSourceConfig.FETCH_SIZE,
PostgresSource.PostgresSourceConfig.DEFAULT_FETCH_SIZE);
sourceProperties.put(PostgresConstants.CONNECTION_TIMEOUT,
PostgresSource.PostgresSourceConfig.DEFAULT_CONNECTION_TIMEOUT_SECONDS);
String table = path.getTable();
if (table == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public static class PostgresSourceConfig extends AbstractDBSpecificSourceConfig

public static final String NAME_USE_CONNECTION = "useConnection";
public static final String NAME_CONNECTION = "connection";
public static final String DEFAULT_CONNECTION_TIMEOUT_SECONDS = "100";

@Name(NAME_USE_CONNECTION)
@Nullable
Expand Down Expand Up @@ -106,6 +107,12 @@ public Map<String, String> getDBSpecificArguments() {
return ImmutableMap.of(PostgresConstants.CONNECTION_TIMEOUT, String.valueOf(connectionTimeout));
}

@Override
public Integer getFetchSize() {
Integer fetchSize = super.getFetchSize();
return fetchSize == null ? Integer.parseInt(DEFAULT_FETCH_SIZE) : fetchSize;
}

@Override
protected AbstractDBSpecificConnectorConfig getConnection() {
return connection;
Expand Down