Skip to content

Commit

Permalink
0004234: Upgrade of sym_data_event is slow
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Jan 7, 2020
1 parent 46ee51b commit e43bc3b
Showing 1 changed file with 49 additions and 15 deletions.
Expand Up @@ -45,6 +45,7 @@
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.db.platform.IAlterDatabaseInterceptor;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.Row;
import org.jumpmind.db.util.MultiInstanceofPredicate;
import org.jumpmind.extension.IBuiltInExtensionPoint;
Expand Down Expand Up @@ -93,23 +94,56 @@ public String beforeUpgrade(ISymmetricDialect symmetricDialect, String tablePref
}

if (isUpgradeFromPre311(tablePrefix, currentModel, desiredModel)) {
long ts = System.currentTimeMillis();
int batchCount = 0, rowCount = 0;
log.info("Preparing data_event for upgrade by clearing unrouted batches");
List<Row> rows = engine.getSqlTemplate().query("select batch_id from " + tablePrefix + "_outgoing_batch where node_id = '-1'");
for (Row row : rows) {
long batchId = row.getLong("batch_id");
rowCount += engine.getSqlTemplate().update("delete from " + tablePrefix + "_data where data_id in (select data_id from " + tablePrefix
+ "_data_event where batch_id = " + batchId + ")");
rowCount += engine.getSqlTemplate().update("delete from " + tablePrefix + "_data_event where batch_id = " + batchId);
rowCount += engine.getSqlTemplate().update("delete from " + tablePrefix + "_outgoing_batch where batch_id = " + batchId);
batchCount++;
if (System.currentTimeMillis() - ts > DateUtils.MILLIS_PER_MINUTE) {
log.info("Cleared {} batches and {} rows so far", batchCount, rowCount);
ts = System.currentTimeMillis();
log.info("Checking data_event for upgrade");

List<Row> rows = engine.getDatabasePlatform().getSqlTemplateDirty().query("select batch_id, data_id, max(router_id) router_id " +
"from " + tablePrefix + "_data_event group by batch_id, data_id having count(*) > 1");

log.info("Found {} rows in data_event with duplicates", rows.size());

if (rows.size() > 0) {
long ts = System.currentTimeMillis();
int commitSize = engine.getParameterService().getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_DATA_IDS);
ISqlTransaction transaction = null;
try {
transaction = engine.getSqlTemplate().startSqlTransaction();
transaction.setInBatchMode(true);
transaction.prepare("delete from " + tablePrefix + "_data_event where batch_id = ? and data_id = ? and router_id != ?");
int[] types = new int[] { engine.getSymmetricDialect().getSqlTypeForIds(), engine.getSymmetricDialect().getSqlTypeForIds(),
Types.VARCHAR };
int uncommittedCount = 0, totalRowCount = 0;
for (Row row : rows) {
uncommittedCount += transaction.addRow(row, new Object[] { row.getLong("batch_id"), row.getLong("data_id"),
row.getString("router_id") }, types);
totalRowCount++;
if (uncommittedCount >= commitSize) {
transaction.commit();
uncommittedCount = 0;
}
if (System.currentTimeMillis() - ts > DateUtils.MILLIS_PER_MINUTE) {
log.info("Processed {} of {} rows so far", totalRowCount, rows.size());
ts = System.currentTimeMillis();
}
}
transaction.commit();
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} catch (RuntimeException ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} finally {
if (transaction != null) {
transaction.close();
}
}
}
log.info("Done preparing data_event, cleared {} batches and {} rows", batchCount, rowCount);

log.info("Done preparing data_event for upgrade");
}

if (engine.getDatabasePlatform().getName().equals(DatabaseNamesConstants.INFORMIX)) {
Expand Down

0 comments on commit e43bc3b

Please sign in to comment.