diff --git a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLConnector.java b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLConnector.java index a2b93a910..9bc1bc7f5 100644 --- a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLConnector.java +++ b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLConnector.java @@ -99,6 +99,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa path.getSchema(), path.getTable())); properties.put(CloudSQLMySQLSource.CloudSQLMySQLSourceConfig.NUM_SPLITS, "1"); + properties.put(CloudSQLMySQLSource.CloudSQLMySQLSourceConfig.FETCH_SIZE, + CloudSQLMySQLSource.CloudSQLMySQLSourceConfig.DEFAULT_FETCH_SIZE); properties.put(ConnectionConfig.DATABASE, path.getDatabase()); properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table)); properties.put(CloudSQLMySQLSink.CloudSQLMySQLSinkConfig.TABLE_NAME, table); diff --git a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLConnector.java b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLConnector.java index 1e590287d..f6b0490b1 100644 --- a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLConnector.java +++ b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLConnector.java @@ -31,7 +31,6 @@ import io.cdap.plugin.common.ReferenceNames; import io.cdap.plugin.common.db.DBConnectorPath; import io.cdap.plugin.common.db.DBPath; -import io.cdap.plugin.db.ConnectionConfig; import io.cdap.plugin.db.SchemaReader; import io.cdap.plugin.db.connector.AbstractDBSpecificConnector; import io.cdap.plugin.postgres.PostgresDBRecord; @@ -112,6 +111,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa sinkProperties.put(CloudSQLPostgreSQLSink.CloudSQLPostgreSQLSinkConfig.DB_SCHEMA_NAME, schema); } sourceProperties.put(CloudSQLPostgreSQLSource.CloudSQLPostgreSQLSourceConfig.NUM_SPLITS, "1"); + sourceProperties.put(CloudSQLPostgreSQLSource.CloudSQLPostgreSQLSourceConfig.FETCH_SIZE, + CloudSQLPostgreSQLSource.CloudSQLPostgreSQLSourceConfig.DEFAULT_FETCH_SIZE); String table = path.getTable(); if (table == null) { return; diff --git a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSource.java b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSource.java index 3d8036341..8b4528329 100644 --- a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSource.java +++ b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSource.java @@ -112,6 +112,12 @@ protected Map getDBSpecificArguments() { return Collections.emptyMap(); } + @Override + public Integer getFetchSize() { + Integer fetchSize = super.getFetchSize(); + return fetchSize == null ? Integer.parseInt(DEFAULT_FETCH_SIZE) : fetchSize; + } + @Override protected CloudSQLPostgreSQLConnectorConfig getConnection() { return connection; diff --git a/database-commons/src/main/java/io/cdap/plugin/db/batch/config/AbstractDBSpecificSourceConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/batch/config/AbstractDBSpecificSourceConfig.java index 0296e728b..8a8df6a00 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/batch/config/AbstractDBSpecificSourceConfig.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/batch/config/AbstractDBSpecificSourceConfig.java @@ -48,6 +48,7 @@ public abstract class AbstractDBSpecificSourceConfig extends PluginConfig implem public static final String SCHEMA = "schema"; public static final String DATABASE = "database"; public static final String FETCH_SIZE = "fetchSize"; + public static final String DEFAULT_FETCH_SIZE = "1000"; @Name(Constants.Reference.REFERENCE_NAME) @Description(Constants.Reference.REFERENCE_NAME_DESCRIPTION) diff --git a/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerConnector.java b/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerConnector.java index 9b75c2546..875cd5de1 100644 --- a/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerConnector.java +++ b/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlServerConnector.java @@ -87,6 +87,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa sinkProperties.put(SqlServerSink.SqlServerSinkConfig.DB_SCHEMA_NAME, schema); } sourceProperties.put(SqlServerSource.SqlServerSourceConfig.NUM_SPLITS, "1"); + sourceProperties.put(SqlServerSource.SqlServerSourceConfig.FETCH_SIZE, + SqlServerSource.SqlServerSourceConfig.DEFAULT_FETCH_SIZE); String table = path.getTable(); if (table == null) { return; diff --git a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlConnector.java b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlConnector.java index adeb9ace1..72b1ca2b0 100644 --- a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlConnector.java +++ b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlConnector.java @@ -80,6 +80,7 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa properties.put(MysqlSource.MysqlSourceConfig.IMPORT_QUERY, getTableQuery(path.getDatabase(), path.getSchema(), path.getTable())); properties.put(MysqlSource.MysqlSourceConfig.NUM_SPLITS, "1"); + properties.put(MysqlSource.MysqlSourceConfig.FETCH_SIZE, MysqlSource.MysqlSourceConfig.DEFAULT_FETCH_SIZE); properties.put(MysqlSource.MysqlSourceConfig.DATABASE, path.getDatabase()); properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table)); properties.put(MysqlSink.MysqlSinkConfig.TABLE_NAME, table); diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java index 8a2962282..8b6ac80c6 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java @@ -85,6 +85,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa sinkProperties.put(OracleSink.OracleSinkConfig.DB_SCHEMA_NAME, schema); } sourceProperties.put(OracleSource.OracleSourceConfig.NUM_SPLITS, "1"); + sourceProperties.put(OracleSource.OracleSourceConfig.FETCH_SIZE, + OracleSource.OracleSourceConfig.DEFAULT_FETCH_SIZE); String table = path.getTable(); if (table == null) { return; diff --git a/postgresql-plugin/docs/Postgres-batchsource.md b/postgresql-plugin/docs/Postgres-batchsource.md index cc4f7ab84..340d66963 100644 --- a/postgresql-plugin/docs/Postgres-batchsource.md +++ b/postgresql-plugin/docs/Postgres-batchsource.md @@ -60,7 +60,7 @@ back from the query. However, it must match the schema that comes back from the except it can mark fields as nullable and can contain a subset of the fields. **Fetch Size:** The number of rows to fetch at a time per split. Larger fetch size can result in faster import, -with the tradeoff of higher memory usage. +with the tradeoff of higher memory usage. If not specified, the default value is 1000. Example ------ diff --git a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresConnector.java b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresConnector.java index 25b8133cd..b5075c0b2 100644 --- a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresConnector.java +++ b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresConnector.java @@ -86,6 +86,10 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa sinkProperties.put(PostgresSink.PostgresSinkConfig.DB_SCHEMA_NAME, schema); } sourceProperties.put(PostgresSource.PostgresSourceConfig.NUM_SPLITS, "1"); + sourceProperties.put(PostgresSource.PostgresSourceConfig.FETCH_SIZE, + PostgresSource.PostgresSourceConfig.DEFAULT_FETCH_SIZE); + sourceProperties.put(PostgresConstants.CONNECTION_TIMEOUT, + PostgresSource.PostgresSourceConfig.DEFAULT_CONNECTION_TIMEOUT_SECONDS); String table = path.getTable(); if (table == null) { return; diff --git a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java index 12dd45003..963bba263 100644 --- a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java +++ b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java @@ -75,6 +75,7 @@ public static class PostgresSourceConfig extends AbstractDBSpecificSourceConfig public static final String NAME_USE_CONNECTION = "useConnection"; public static final String NAME_CONNECTION = "connection"; + public static final String DEFAULT_CONNECTION_TIMEOUT_SECONDS = "100"; @Name(NAME_USE_CONNECTION) @Nullable @@ -106,6 +107,12 @@ public Map getDBSpecificArguments() { return ImmutableMap.of(PostgresConstants.CONNECTION_TIMEOUT, String.valueOf(connectionTimeout)); } + @Override + public Integer getFetchSize() { + Integer fetchSize = super.getFetchSize(); + return fetchSize == null ? Integer.parseInt(DEFAULT_FETCH_SIZE) : fetchSize; + } + @Override protected AbstractDBSpecificConnectorConfig getConnection() { return connection;