Skip to content

Commit

Permalink
0004672: Quick test before upgrade of sym_data_event
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Dec 1, 2020
1 parent a9b05e6 commit c30294e
Showing 1 changed file with 74 additions and 51 deletions.
Expand Up @@ -56,6 +56,7 @@
import org.jumpmind.symmetric.ext.IDatabaseUpgradeListener;
import org.jumpmind.symmetric.ext.ISymmetricEngineAware;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.model.TriggerRouter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -93,57 +94,8 @@ public String beforeUpgrade(ISymmetricDialect symmetricDialect, String tablePref
}
}

if (isUpgradeFromPre311(tablePrefix, currentModel, desiredModel)) {
log.info("Checking data_event for upgrade");

List<Row> rows = engine.getDatabasePlatform().getSqlTemplateDirty().query("select batch_id, data_id, max(router_id) router_id " +
"from " + tablePrefix + "_data_event group by batch_id, data_id having count(*) > 1");

log.info("Found {} rows in data_event with duplicates", rows.size());

if (rows.size() > 0) {
long ts = System.currentTimeMillis();
int commitSize = engine.getParameterService().getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_DATA_IDS);
ISqlTransaction transaction = null;
try {
transaction = engine.getSqlTemplate().startSqlTransaction();
transaction.setInBatchMode(true);
transaction.prepare("delete from " + tablePrefix + "_data_event where batch_id = ? and data_id = ? and router_id != ?");
int[] types = new int[] { engine.getSymmetricDialect().getSqlTypeForIds(), engine.getSymmetricDialect().getSqlTypeForIds(),
Types.VARCHAR };
int uncommittedCount = 0, totalRowCount = 0;
for (Row row : rows) {
uncommittedCount += transaction.addRow(row, new Object[] { row.getLong("batch_id"), row.getLong("data_id"),
row.getString("router_id") }, types);
totalRowCount++;
if (uncommittedCount >= commitSize) {
transaction.commit();
uncommittedCount = 0;
}
if (System.currentTimeMillis() - ts > DateUtils.MILLIS_PER_MINUTE) {
log.info("Processed {} of {} rows so far", totalRowCount, rows.size());
ts = System.currentTimeMillis();
}
}
transaction.commit();
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} catch (RuntimeException ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} finally {
if (transaction != null) {
transaction.close();
}
}
}

log.info("Done preparing data_event for upgrade");
if (isUpgradeFromPre311(tablePrefix, currentModel, desiredModel) && shouldFixDataEvent311()) {
fixDataEvent311(tablePrefix);
}

if (engine.getDatabasePlatform().getName().equals(DatabaseNamesConstants.INFORMIX)) {
Expand Down Expand Up @@ -258,6 +210,77 @@ protected boolean isUpgradeFromPre311(String tablePrefix, Database currentModel,
return false;
}
}

protected boolean shouldFixDataEvent311() {
List<TriggerRouter> triggerRouters = engine.getTriggerRouterService().getAllTriggerRoutersForCurrentNode(
engine.getParameterService().getNodeGroupId());
HashSet<String> set = new HashSet<String>();
boolean shouldFix = engine.getParameterService().is("upgrade.force.fix.data.event");
if (!shouldFix && !engine.getParameterService().is("upgrade.skip.fix.data.event")) {
for (TriggerRouter triggerRouter : triggerRouters) {
String key = triggerRouter.getTriggerId() + "-" + triggerRouter.getRouter().getNodeGroupLink().getTargetNodeGroupId();
if (set.contains(key)) {
shouldFix = true;
break;
}
set.add(key);
}
}
return shouldFix;
}

protected void fixDataEvent311(String tablePrefix) {
log.info("Checking data_event for upgrade");

List<Row> rows = engine.getDatabasePlatform().getSqlTemplateDirty().query("select batch_id, data_id, max(router_id) router_id " +
"from " + tablePrefix + "_data_event group by batch_id, data_id having count(*) > 1");

log.info("Found {} rows in data_event with duplicates", rows.size());

if (rows.size() > 0) {
long ts = System.currentTimeMillis();
int commitSize = engine.getParameterService().getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_DATA_IDS);
ISqlTransaction transaction = null;
try {
transaction = engine.getSqlTemplate().startSqlTransaction();
transaction.setInBatchMode(true);
transaction.prepare("delete from " + tablePrefix + "_data_event where batch_id = ? and data_id = ? and router_id != ?");
int[] types = new int[] { engine.getSymmetricDialect().getSqlTypeForIds(), engine.getSymmetricDialect().getSqlTypeForIds(),
Types.VARCHAR };
int uncommittedCount = 0, totalRowCount = 0;
for (Row row : rows) {
uncommittedCount += transaction.addRow(row, new Object[] { row.getLong("batch_id"), row.getLong("data_id"),
row.getString("router_id") }, types);
totalRowCount++;
if (uncommittedCount >= commitSize) {
transaction.commit();
uncommittedCount = 0;
}
if (System.currentTimeMillis() - ts > DateUtils.MILLIS_PER_MINUTE) {
log.info("Processed {} of {} rows so far", totalRowCount, rows.size());
ts = System.currentTimeMillis();
}
}
transaction.commit();
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} catch (RuntimeException ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} finally {
if (transaction != null) {
transaction.close();
}
}
}

log.info("Done preparing data_event for upgrade");
}

@Override
public void setSymmetricEngine(ISymmetricEngine engine) {
Expand Down

0 comments on commit c30294e

Please sign in to comment.