Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions aurora-mysql-plugin/docs/AuroraMysql-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ will be passed to the JDBC driver as connection arguments for JDBC drivers that
back from the query. However, it must match the schema that comes back from the query,
except it can mark fields as nullable and can contain a subset of the fields.

**Fetch Size:** The number of rows to fetch at a time per split. Larger fetch size can result in faster import,
with the tradeoff of higher memory usage.

Example
------
Suppose you want to read data from an Aurora DB MySQL database named "prod" that is running on
Expand Down
9 changes: 9 additions & 0 deletions aurora-mysql-plugin/widgets/AuroraMysql-batchsource.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@
"widget-attributes": {
"default": "1"
}
},
{
"widget-type": "number",
"label": "Fetch Size",
"name": "fetchSize",
"widget-attributes": {
"default": "1000",
"minimum": "0"
}
}
]
},
Expand Down
3 changes: 3 additions & 0 deletions aurora-postgresql-plugin/docs/AuroraPostgres-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ disabled.
back from the query. However, it must match the schema that comes back from the query,
except it can mark fields as nullable and can contain a subset of the fields.

**Fetch Size:** The number of rows to fetch at a time per split. Larger fetch size can result in faster import,
with the tradeoff of higher memory usage.

Example
------
Suppose you want to read data from an Aurora DB PostgreSQL database named "prod" that is running on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@
"widget-attributes": {
"default": "1"
}
},
{
"widget-type": "number",
"label": "Fetch Size",
"name": "fetchSize",
"widget-attributes": {
"default": "1000",
"minimum": "0"
}
}
]
},
Expand Down
2 changes: 2 additions & 0 deletions cloudsql-mysql-plugin/docs/CloudSQLMySQL-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ will be passed to the JDBC driver as connection arguments for JDBC drivers that
back from the query. However, it must match the schema that comes back from the query,
except it can mark fields as nullable and can contain a subset of the fields.

**Fetch Size:** The number of rows to fetch at a time per split. Larger fetch size can result in faster import,
with the tradeoff of higher memory usage.

Data Types Mapping
------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import io.cdap.plugin.db.SchemaReader;
import io.cdap.plugin.db.batch.source.AbstractDBSource;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;

/** Batch source to read from CloudSQL MySQL. */
Expand Down Expand Up @@ -112,5 +115,17 @@ public String getConnectionString() {
database,
connectionName);
}

@Override
protected Map<String, String> getDBSpecificArguments() {
if (getFetchSize() == null || getFetchSize() <= 0) {
return Collections.emptyMap();
}
Map<String, String> arguments = new HashMap<>();
// If connected to MySQL > 5.0.2, and setFetchSize() > 0 on a statement,
// statement will use cursor-based fetching to retrieve rows
arguments.put("useCursorFetch", "true");
return arguments;
}
}
}
9 changes: 9 additions & 0 deletions cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@
"kv-delimiter": "=",
"delimiter": ";"
}
},
{
"widget-type": "number",
"label": "Fetch Size",
"name": "fetchSize",
"widget-attributes": {
"default": "1000",
"minimum": "0"
}
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ will be passed to the JDBC driver as connection arguments for JDBC drivers that
back from the query. However, it must match the schema that comes back from the query,
except it can mark fields as nullable and can contain a subset of the fields.

**Fetch Size:** The number of rows to fetch at a time per split. Larger fetch size can result in faster import,
with the tradeoff of higher memory usage.

Examples
--------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@
"kv-delimiter": "=",
"delimiter": ";"
}
},
{
"widget-type": "number",
"label": "Fetch Size",
"name": "fetchSize",
"widget-attributes": {
"default": "1000",
"minimum": "0"
}
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class ConnectionConfigAccessor {
private static final String CONNECTION_ARGUMENTS = "io.cdap.plugin.db.connection.arguments";
private static final String INIT_QUERIES = "io.cdap.plugin.db.init.queries";
public static final String AUTO_COMMIT_ENABLED = "io.cdap.plugin.db.output.autocommit.enabled";
public static final String FETCH_SIZE = "io.cdap.plugin.db.fetch.size";

private static final Gson GSON = new Gson();
private static final Type STRING_MAP_TYPE = new TypeToken<Map<String, String>>() { }.getType();
Expand Down Expand Up @@ -99,6 +100,14 @@ public boolean isAutoCommitEnabled() {
return configuration.getBoolean(AUTO_COMMIT_ENABLED, false);
}

public void setFetchSize(Integer fetchSize) {
configuration.setInt(FETCH_SIZE, fetchSize);
}

public Integer getFetchSize() {
return configuration.getInt(FETCH_SIZE, 0);
}

public Configuration getConfiguration() {
return configuration;
}
Expand Down
Loading