Skip to content

Commit

Permalink
0004673: Quick test before upgrade of sym_data_event
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Dec 1, 2020
1 parent 44d36e3 commit 126b9b4
Showing 1 changed file with 75 additions and 54 deletions.
Expand Up @@ -28,7 +28,6 @@
import java.util.Set;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.Predicate;
import org.apache.commons.lang3.time.DateUtils;
import org.jumpmind.db.alter.AddColumnChange;
import org.jumpmind.db.alter.AddPrimaryKeyChange;
Expand Down Expand Up @@ -56,6 +55,7 @@
import org.jumpmind.symmetric.ext.IDatabaseUpgradeListener;
import org.jumpmind.symmetric.ext.ISymmetricEngineAware;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.model.TriggerRouter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -94,58 +94,8 @@ public String beforeUpgrade(ISymmetricDialect symmetricDialect, String tablePref
}
}

if (isUpgradeFromPre311(tablePrefix, currentModel, desiredModel)) {
// TODO: check if config even has two routers for this node
log.info("Checking data_event for upgrade");

List<Row> rows = engine.getDatabasePlatform().getSqlTemplateDirty().query("select batch_id, data_id, max(router_id) router_id " +
"from " + tablePrefix + "_data_event group by batch_id, data_id having count(*) > 1");

log.info("Found {} rows in data_event with duplicates", rows.size());

if (rows.size() > 0) {
long ts = System.currentTimeMillis();
int commitSize = engine.getParameterService().getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_DATA_IDS);
ISqlTransaction transaction = null;
try {
transaction = engine.getSqlTemplate().startSqlTransaction();
transaction.setInBatchMode(true);
transaction.prepare("delete from " + tablePrefix + "_data_event where batch_id = ? and data_id = ? and router_id != ?");
int[] types = new int[] { engine.getSymmetricDialect().getSqlTypeForIds(), engine.getSymmetricDialect().getSqlTypeForIds(),
Types.VARCHAR };
int uncommittedCount = 0, totalRowCount = 0;
for (Row row : rows) {
uncommittedCount += transaction.addRow(row, new Object[] { row.getLong("batch_id"), row.getLong("data_id"),
row.getString("router_id") }, types);
totalRowCount++;
if (uncommittedCount >= commitSize) {
transaction.commit();
uncommittedCount = 0;
}
if (System.currentTimeMillis() - ts > DateUtils.MILLIS_PER_MINUTE) {
log.info("Processed {} of {} rows so far", totalRowCount, rows.size());
ts = System.currentTimeMillis();
}
}
transaction.commit();
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} catch (RuntimeException ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} finally {
if (transaction != null) {
transaction.close();
}
}
}

log.info("Done preparing data_event for upgrade");
if (isUpgradeFromPre311(tablePrefix, currentModel, desiredModel) && shouldFixDataEvent311()) {
fixDataEvent311(tablePrefix);
}

if (isUpgradeFromPre312(tablePrefix, currentModel, desiredModel)) {
Expand Down Expand Up @@ -214,7 +164,7 @@ public String beforeUpgrade(ISymmetricDialect symmetricDialect, String tablePref
List<IModelChange> modelChanges = engine.getDatabasePlatform().getDdlBuilder().getDetectedChanges(currentModel, desiredModel,
alterDatabaseInterceptors.toArray(new IAlterDatabaseInterceptor[alterDatabaseInterceptors.size()]));

Predicate predicate = new MultiInstanceofPredicate(
MultiInstanceofPredicate predicate = new MultiInstanceofPredicate(
new Class[] { RemovePrimaryKeyChange.class, AddPrimaryKeyChange.class, PrimaryKeyChange.class, RemoveColumnChange.class,
AddColumnChange.class, ColumnDataTypeChange.class, ColumnSizeChange.class, CopyColumnValueChange.class });
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -304,6 +254,77 @@ protected boolean isUpgradeFromPre311(String tablePrefix, Database currentModel,
return false;
}
}

protected boolean shouldFixDataEvent311() {
List<TriggerRouter> triggerRouters = engine.getTriggerRouterService().getAllTriggerRoutersForCurrentNode(
engine.getParameterService().getNodeGroupId());
HashSet<String> set = new HashSet<String>();
boolean shouldFix = engine.getParameterService().is("upgrade.force.fix.data.event");
if (!shouldFix && !engine.getParameterService().is("upgrade.skip.fix.data.event")) {
for (TriggerRouter triggerRouter : triggerRouters) {
String key = triggerRouter.getTriggerId() + "-" + triggerRouter.getRouter().getNodeGroupLink().getTargetNodeGroupId();
if (set.contains(key)) {
shouldFix = true;
break;
}
set.add(key);
}
}
return shouldFix;
}

protected void fixDataEvent311(String tablePrefix) {
log.info("Checking data_event for upgrade");

List<Row> rows = engine.getDatabasePlatform().getSqlTemplateDirty().query("select batch_id, data_id, max(router_id) router_id " +
"from " + tablePrefix + "_data_event group by batch_id, data_id having count(*) > 1");

log.info("Found {} rows in data_event with duplicates", rows.size());

if (rows.size() > 0) {
long ts = System.currentTimeMillis();
int commitSize = engine.getParameterService().getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_DATA_IDS);
ISqlTransaction transaction = null;
try {
transaction = engine.getSqlTemplate().startSqlTransaction();
transaction.setInBatchMode(true);
transaction.prepare("delete from " + tablePrefix + "_data_event where batch_id = ? and data_id = ? and router_id != ?");
int[] types = new int[] { engine.getSymmetricDialect().getSqlTypeForIds(), engine.getSymmetricDialect().getSqlTypeForIds(),
Types.VARCHAR };
int uncommittedCount = 0, totalRowCount = 0;
for (Row row : rows) {
uncommittedCount += transaction.addRow(row, new Object[] { row.getLong("batch_id"), row.getLong("data_id"),
row.getString("router_id") }, types);
totalRowCount++;
if (uncommittedCount >= commitSize) {
transaction.commit();
uncommittedCount = 0;
}
if (System.currentTimeMillis() - ts > DateUtils.MILLIS_PER_MINUTE) {
log.info("Processed {} of {} rows so far", totalRowCount, rows.size());
ts = System.currentTimeMillis();
}
}
transaction.commit();
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} catch (RuntimeException ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} finally {
if (transaction != null) {
transaction.close();
}
}
}

log.info("Done preparing data_event for upgrade");
}

protected boolean isUpgradeFromPre312(String tablePrefix, Database currentModel, Database desiredModel) {
Table eventTable = currentModel.findTable(tablePrefix + "_" + TableConstants.SYM_NODE_SECURITY);
Expand Down

0 comments on commit 126b9b4

Please sign in to comment.