Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
0000997: dbimport throttle
  • Loading branch information
chenson42 committed Jan 23, 2013
1 parent 579bd6c commit cf504e5
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 13 deletions.
Expand Up @@ -121,6 +121,7 @@ private ParameterConstants() {
public final static String DATA_LOADER_NUM_OF_ACK_RETRIES = "num.of.ack.retries";
public final static String DATA_LOADER_TIME_BETWEEN_ACK_RETRIES = "time.between.ack.retries.ms";
public final static String DATA_LOADER_MAX_ROWS_BEFORE_COMMIT = "dataloader.max.rows.before.commit";
public final static String DATA_LOADER_SLEEP_TIME_AFTER_EARLY_COMMIT = "dataloader.sleep.time.after.early.commit";
public final static String DATA_LOADER_TREAT_DATETIME_AS_VARCHAR = "db.treat.date.time.as.varchar.enabled";

public final static String DATA_RELOAD_IS_BATCH_INSERT_TRANSACTIONAL = "datareload.batch.insert.transactional";
Expand Down
Expand Up @@ -84,6 +84,7 @@ protected DatabaseWriterSettings buildDatabaseWriterSettings(
settings.setDatabaseWriterErrorHandlers(errorHandlers);
settings.setMaxRowsBeforeCommit(parameterService
.getLong(ParameterConstants.DATA_LOADER_MAX_ROWS_BEFORE_COMMIT));
settings.setCommitSleepInterval(parameterService.getLong(ParameterConstants.DATA_LOADER_SLEEP_TIME_AFTER_EARLY_COMMIT));
settings.setIgnoreMissingTables(parameterService.is(ParameterConstants.DATA_LOADER_IGNORE_MISSING_TABLES));
settings.setTreatDateTimeFieldsAsVarchar(parameterService
.is(ParameterConstants.DATA_LOADER_TREAT_DATETIME_AS_VARCHAR));
Expand Down
Expand Up @@ -740,6 +740,13 @@ dataloader.ignore.missing.tables=true
# Tags: load
dataloader.max.rows.before.commit=10000

# Amount of time to sleep before continuing data load after dataloader.max.rows.before.commit rows have been loaded.
# This is useful to give other application threads a chance to do work before continuing to load.
#
# DatabaseOverridable: true
# Tags: load
dataloader.sleep.time.after.early.commit=5

# The number of milliseconds parameters will be cached by the ParameterService before they are reread from the
# file system and database.
#
Expand Down
Expand Up @@ -221,16 +221,21 @@ public void write(CsvData data) {
protected void checkForEarlyCommit() {
if (uncommittedCount >= writerSettings.getMaxRowsBeforeCommit()) {
commit(true);
/*
* Chances are if SymmetricDS is configured to commit early in a
* batch we want to give other threads a chance to do work and
* access the database. This was added to support H2 clients
* that are loading big batches while an application is doing work.
*/
try {
Thread.sleep(writerSettings.getCommitSleepInterval());
} catch (InterruptedException e) {
log.warn("{}", e.getMessage());

long sleep = writerSettings.getCommitSleepInterval();
if (sleep > 0) {
/*
* Chances are if SymmetricDS is configured to commit early in a
* batch we want to give other threads a chance to do work and
* access the database. This was added to support H2 clients
* that are loading big batches while an application is doing
* work.
*/
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
log.warn("{}", e.getMessage());
}
}
}
}
Expand Down
Expand Up @@ -33,7 +33,7 @@ public class DatabaseWriterSettings {
protected long maxRowsBeforeCommit = 10000;

// Milliseconds to sleep between commits.
protected int commitSleepInterval = 5;
protected long commitSleepInterval = 5;

protected boolean treatDateTimeFieldsAsVarchar = false;

Expand Down Expand Up @@ -235,11 +235,11 @@ public Conflict pickConflict(Table table, Batch batch) {

}

public int getCommitSleepInterval() {
public long getCommitSleepInterval() {
return commitSleepInterval;
}

public void setCommitSleepInterval(int commitSleepInterval) {
public void setCommitSleepInterval(long commitSleepInterval) {
this.commitSleepInterval = commitSleepInterval;
}

Expand Down

0 comments on commit cf504e5

Please sign in to comment.