Skip to content

Commit

Permalink
0005148: Faster save performance for Configure -> Table Triggers and
Browse files Browse the repository at this point in the history
Routing screens
  • Loading branch information
erilong committed Dec 10, 2021
1 parent 30a71b0 commit 4178fe0
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 80 deletions.
Expand Up @@ -219,6 +219,7 @@ private ParameterConstants() {
public final static String DATA_RELOAD_IS_BATCH_INSERT_TRANSACTIONAL = "datareload.batch.insert.transactional";
public final static String DATA_EXTRACTOR_ENABLED = "dataextractor.enable";
public final static String DATA_EXTRACTOR_TEXT_COLUMN_EXPRESSION = "dataextractor.text.column.expression";
public final static String DATA_FLUSH_JDBC_BATCH_SIZE = "data.flush.jdbc.batch.size";
public final static String OUTGOING_BATCH_MAX_BATCHES_TO_SELECT = "outgoing.batches.max.to.select";
public final static String DBDIALECT_ORACLE_USE_TRANSACTION_VIEW = "oracle.use.transaction.view";
public final static String DBDIALECT_ORACLE_TEMPLATE_NUMBER_SPEC = "oracle.template.precision";
Expand Down
Expand Up @@ -138,8 +138,14 @@ public interface ITriggerRouterService {
public List<Trigger> getTriggers(boolean replaceTokens);

public void saveTrigger(Trigger trigger);

public void insertTriggers(Collection<Trigger> triggers);

public void updateTriggers(Collection<Trigger> triggers);

public void deleteTrigger(Trigger trigger);

public void deleteTriggers(Collection<Trigger> triggers);

public void dropTriggers();

Expand Down Expand Up @@ -195,12 +201,20 @@ public String getTriggerName(DataEventType dml, int maxTriggerNameLength, Trigge

public void deleteTriggerRouter(String triggerId, String routerId);

public void deleteTriggerRouters(Collection<TriggerRouter> triggerRouters);

public void deleteAllTriggerRouters();

public void saveTriggerRouter(TriggerRouter triggerRouter, boolean updateTriggerRouterTableOnly);

public void saveTriggerRouter(TriggerRouter triggerRouter);

public void insertTriggerRouters(Collection<TriggerRouter> triggerRouters);

public void insertTriggersAndTriggerRouters(Collection<Trigger> triggers, Collection<TriggerRouter> triggerRouters);

public void updateTriggerRouters(Collection<TriggerRouter> triggerRouters);

public void syncTrigger(Trigger trigger, ITriggerCreationListener listener, boolean force);

public void syncTrigger(Trigger trigger, ITriggerCreationListener listener, boolean force, boolean verifyTrigger);
Expand Down
Expand Up @@ -194,6 +194,41 @@ public void deleteTrigger(Trigger trigger) {
sqlTemplate.update(getSql("deleteTriggerSql"), (Object) trigger.getTriggerId());
}

@Override
public void deleteTriggers(Collection<Trigger> triggers) {
List<TriggerRouter> triggerRouters = getTriggerRouters(true);
List<TriggerRouter> triggerRoutersToDelete = new ArrayList<TriggerRouter>();
for (TriggerRouter triggerRouter : triggerRouters) {
if (triggers.contains(triggerRouter.getTrigger())) {
triggerRoutersToDelete.add(triggerRouter);
}
}
ISqlTransaction transaction = null;
try {
int maxRowsToFlush = parameterService.getInt(ParameterConstants.DATA_FLUSH_JDBC_BATCH_SIZE);
transaction = sqlTemplate.startSqlTransaction();
deleteTriggerRouters(transaction, triggerRoutersToDelete);
transaction.prepare(getSql("deleteTriggerSql"));
int[] types = new int[] { Types.VARCHAR };
int rowCount = 0;
for (Trigger trigger : triggers) {
transaction.addRow(null, new Object[] { trigger.getTriggerId() }, types);
if (++rowCount > maxRowsToFlush) {
transaction.flush();
}
}
transaction.commit();
} catch (Exception e) {
if (transaction != null) {
transaction.rollback();
}
throw e;
} finally {
close(transaction);
}
clearCache();
}

public void dropTriggers() {
List<TriggerHistory> activeHistories = getActiveTriggerHistories();
for (TriggerHistory history : activeHistories) {
Expand Down Expand Up @@ -958,6 +993,38 @@ public void deleteTriggerRouter(TriggerRouter triggerRouter) {
clearCache();
}

@Override
public void deleteTriggerRouters(Collection<TriggerRouter> triggerRouters) {
ISqlTransaction transaction = null;
try {
transaction = sqlTemplate.startSqlTransaction();
deleteTriggerRouters(transaction, triggerRouters);
transaction.commit();
} catch (Exception e) {
if (transaction != null) {
transaction.rollback();
}
throw e;
} finally {
close(transaction);
}
clearCache();
}

protected void deleteTriggerRouters(ISqlTransaction transaction, Collection<TriggerRouter> triggerRouters) {
int maxRowsToFlush = parameterService.getInt(ParameterConstants.DATA_FLUSH_JDBC_BATCH_SIZE);
transaction.prepare(getSql("deleteTriggerRouterSql"));
int[] types = new int[] { Types.VARCHAR };
int rowCount = 0;
for (TriggerRouter triggerRouter : triggerRouters) {
transaction.addRow(null, new Object[] { triggerRouter.getTrigger().getTriggerId(), triggerRouter.getRouter().getRouterId() }, types);
if (++rowCount > maxRowsToFlush) {
transaction.flush();
}
}
transaction.flush();
}

public void deleteAllTriggerRouters() {
sqlTemplate.update(getSql("deleteAllTriggerRoutersSql"));
clearCache();
Expand All @@ -967,43 +1034,83 @@ public void saveTriggerRouter(TriggerRouter triggerRouter) {
saveTriggerRouter(triggerRouter, false);
}

@Override
public void saveTriggerRouter(TriggerRouter triggerRouter, boolean updateTriggerRouterTableOnly) {
if (!updateTriggerRouterTableOnly) {
saveTrigger(triggerRouter.getTrigger());
saveRouter(triggerRouter.getRouter());
}
triggerRouter.setLastUpdateTime(new Date());
if (0 >= sqlTemplate.update(
getSql("updateTriggerRouterSql"),
new Object[] { triggerRouter.getInitialLoadOrder(),
triggerRouter.getInitialLoadSelect(),
triggerRouter.getInitialLoadDeleteStmt(),
triggerRouter.isPingBackEnabled() ? 1 : 0,
triggerRouter.getLastUpdateBy(),
triggerRouter.getLastUpdateTime(),
triggerRouter.isEnabled() ? 1 : 0,
triggerRouter.getTrigger().getTriggerId(),
triggerRouter.getRouter().getRouterId() }, new int[] { Types.NUMERIC,
Types.VARCHAR, Types.VARCHAR, Types.SMALLINT, Types.VARCHAR,
Types.TIMESTAMP, Types.SMALLINT, Types.VARCHAR, Types.VARCHAR })) {
triggerRouter.setCreateTime(triggerRouter.getLastUpdateTime());
sqlTemplate.update(
getSql("insertTriggerRouterSql"),
new Object[] { triggerRouter.getInitialLoadOrder(),
triggerRouter.getInitialLoadSelect(),
triggerRouter.getInitialLoadDeleteStmt(),
triggerRouter.isPingBackEnabled() ? 1 : 0,
triggerRouter.getCreateTime(), triggerRouter.getLastUpdateBy(),
triggerRouter.getLastUpdateTime(),
triggerRouter.isEnabled() ? 1 : 0,
triggerRouter.getTrigger().getTriggerId(),
triggerRouter.getRouter().getRouterId() }, new int[] { Types.NUMERIC,
Types.VARCHAR, Types.VARCHAR, Types.SMALLINT, Types.TIMESTAMP,
Types.VARCHAR, Types.TIMESTAMP, Types.SMALLINT, Types.VARCHAR, Types.VARCHAR });
}
if (0 >= sqlTemplate.update(getSql("updateTriggerRouterSql"), getTriggerRouterSqlValues(triggerRouter),
getTriggerRouterSqlTypes())) {
triggerRouter.setCreateTime(triggerRouter.getLastUpdateTime());
sqlTemplate.update(getSql("insertTriggerRouterSql"), getTriggerRouterSqlValues(triggerRouter),
getTriggerRouterSqlTypes());
}
clearCache();
}

@Override
public void insertTriggerRouters(Collection<TriggerRouter> triggerRouters) {
insertUpdateTriggerRouters(triggerRouters, true, null);
}

@Override
public void insertTriggersAndTriggerRouters(Collection<Trigger> triggers, Collection<TriggerRouter> triggerRouters) {
insertUpdateTriggerRouters(triggerRouters, true, triggers);
}

@Override
public void updateTriggerRouters(Collection<TriggerRouter> triggerRouters) {
insertUpdateTriggerRouters(triggerRouters, false, null);
}

protected void insertUpdateTriggerRouters(Collection<TriggerRouter> triggerRouters, boolean isInsert, Collection<Trigger> triggers) {
ISqlTransaction transaction = null;
try {
int maxRowsToFlush = parameterService.getInt(ParameterConstants.DATA_FLUSH_JDBC_BATCH_SIZE);
transaction = sqlTemplate.startSqlTransaction();
if (triggers != null) {
insertUpdateTriggers(transaction, triggers, isInsert);
}
transaction.prepare(isInsert ? getSql("insertTriggerRouterSql") : getSql("updateTriggerRouterSql"));
int[] types = getTriggerRouterSqlTypes();
int rowCount = 0;
for (TriggerRouter triggerRouter : triggerRouters) {
triggerRouter.setLastUpdateTime(new Date());
if (triggerRouter.getCreateTime() == null) {
triggerRouter.setCreateTime(triggerRouter.getLastUpdateTime());
}
transaction.addRow(null, getTriggerRouterSqlValues(triggerRouter), types);
if (++rowCount > maxRowsToFlush) {
transaction.flush();
}
}
transaction.commit();
} catch (Exception e) {
if (transaction != null) {
transaction.rollback();
}
throw e;
} finally {
close(transaction);
}
clearCache();
}

protected int[] getTriggerRouterSqlTypes() {
return new int[] { Types.NUMERIC, Types.VARCHAR, Types.VARCHAR, Types.SMALLINT, Types.TIMESTAMP, Types.VARCHAR,
Types.TIMESTAMP, Types.SMALLINT, Types.VARCHAR, Types.VARCHAR };
}

protected Object[] getTriggerRouterSqlValues(TriggerRouter triggerRouter) {
return new Object[] { triggerRouter.getInitialLoadOrder(), triggerRouter.getInitialLoadSelect(),
triggerRouter.getInitialLoadDeleteStmt(), triggerRouter.isPingBackEnabled() ? 1 : 0,
triggerRouter.getCreateTime(), triggerRouter.getLastUpdateBy(), triggerRouter.getLastUpdateTime(),
triggerRouter.isEnabled() ? 1 : 0, triggerRouter.getTrigger().getTriggerId(),
triggerRouter.getRouter().getRouterId() };
}

protected void resetTriggerRouterCacheByNodeGroupId() {
cacheManager.flushTriggerRoutersByNodeGroupId();
}
Expand Down Expand Up @@ -1063,62 +1170,90 @@ public void deleteAllRouters() {
sqlTemplate.update(getSql("deleteAllRoutersSql"));
}

@Override
public void saveTrigger(Trigger trigger) {
trigger.setLastUpdateTime(new Date());
trigger.nullOutBlankFields();
if (0 >= sqlTemplate.update(
getSql("updateTriggerSql"),
new Object[] { trigger.getSourceCatalogName(), trigger.getSourceSchemaName(),
trigger.getSourceTableName(), trigger.getChannelId(), trigger.getReloadChannelId(),
trigger.isSyncOnUpdate() ? 1 : 0, trigger.isSyncOnInsert() ? 1 : 0,
trigger.isSyncOnDelete() ? 1 : 0, trigger.isSyncOnIncomingBatch() ? 1 : 0,
trigger.isUseStreamLobs() ? 1 : 0, trigger.isUseCaptureLobs() ? 1 : 0,
trigger.isUseCaptureOldData() ? 1 : 0, trigger.isUseHandleKeyUpdates() ? 1 : 0,
trigger.getNameForUpdateTrigger(), trigger.getNameForInsertTrigger(),
trigger.getNameForDeleteTrigger(), trigger.getSyncOnUpdateCondition(),
trigger.getSyncOnInsertCondition(), trigger.getSyncOnDeleteCondition(),
trigger.getCustomBeforeUpdateText(), trigger.getCustomBeforeInsertText(),
trigger.getCustomBeforeDeleteText(), trigger.getCustomOnUpdateText(),
trigger.getCustomOnInsertText(), trigger.getCustomOnDeleteText(), trigger.getTxIdExpression(),
trigger.getExcludedColumnNames(), trigger.getIncludedColumnNames(), trigger.getSyncKeyNames(),
trigger.getLastUpdateBy(), trigger.getLastUpdateTime(), trigger.getExternalSelect(),
trigger.getChannelExpression(), trigger.isStreamRow(), trigger.getTriggerId() },
new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.SMALLINT,
Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT,
Types.SMALLINT, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.SMALLINT, Types.VARCHAR })) {
if (0 >= sqlTemplate.update(getSql("updateTriggerSql"), getTriggerSqlValues(trigger), getTriggerSqlTypes())) {
trigger.setCreateTime(trigger.getLastUpdateTime());
sqlTemplate.update(
getSql("insertTriggerSql"),
new Object[] { trigger.getSourceCatalogName(), trigger.getSourceSchemaName(),
trigger.getSourceTableName(), trigger.getChannelId(), trigger.getReloadChannelId(),
trigger.isSyncOnUpdate() ? 1 : 0, trigger.isSyncOnInsert() ? 1 : 0,
trigger.isSyncOnDelete() ? 1 : 0, trigger.isSyncOnIncomingBatch() ? 1 : 0,
trigger.isUseStreamLobs() ? 1 : 0, trigger.isUseCaptureLobs() ? 1 : 0,
trigger.isUseCaptureOldData() ? 1 : 0, trigger.isUseHandleKeyUpdates() ? 1 : 0,
trigger.getNameForUpdateTrigger(), trigger.getNameForInsertTrigger(),
trigger.getNameForDeleteTrigger(), trigger.getSyncOnUpdateCondition(),
trigger.getSyncOnInsertCondition(), trigger.getSyncOnDeleteCondition(),
trigger.getCustomBeforeUpdateText(), trigger.getCustomBeforeInsertText(),
trigger.getCustomBeforeDeleteText(), trigger.getCustomOnUpdateText(),
trigger.getCustomOnInsertText(), trigger.getCustomOnDeleteText(),
trigger.getTxIdExpression(), trigger.getExcludedColumnNames(),
trigger.getIncludedColumnNames(), trigger.getSyncKeyNames(), trigger.getCreateTime(),
trigger.getLastUpdateBy(), trigger.getLastUpdateTime(), trigger.getExternalSelect(),
trigger.getChannelExpression(), trigger.getTriggerId() },
new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT,
Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR });
sqlTemplate.update(getSql("insertTriggerSql"), getTriggerSqlValues(trigger), getTriggerSqlTypes());
}
clearCache();
}

@Override
public void insertTriggers(Collection<Trigger> triggers) {
insertUpdateTriggers(triggers, true);
}

@Override
public void updateTriggers(Collection<Trigger> triggers) {
insertUpdateTriggers(triggers, false);
}

protected void insertUpdateTriggers(Collection<Trigger> triggers, boolean isInsert) {
ISqlTransaction transaction = null;
try {
transaction = sqlTemplate.startSqlTransaction();
insertUpdateTriggers(transaction, triggers, isInsert);
transaction.commit();
} catch (Exception e) {
if (transaction != null) {
transaction.rollback();
}
throw e;
} finally {
close(transaction);
}
clearCache();
}

protected void insertUpdateTriggers(ISqlTransaction transaction, Collection<Trigger> triggers, boolean isInsert) {
int maxRowsToFlush = parameterService.getInt(ParameterConstants.DATA_FLUSH_JDBC_BATCH_SIZE);
transaction.prepare(isInsert ? getSql("insertTriggerSql") : getSql("updateTriggerSql"));
int[] types = getTriggerSqlTypes();
int rowCount = 0;
for (Trigger trigger : triggers) {
trigger.setLastUpdateTime(new Date());
trigger.nullOutBlankFields();
if (trigger.getCreateTime() == null) {
trigger.setCreateTime(trigger.getLastUpdateTime());
}
transaction.addRow(null, getTriggerSqlValues(trigger), types);
if (++rowCount > maxRowsToFlush) {
transaction.flush();
}
}
transaction.flush();
}

protected int[] getTriggerSqlTypes() {
return new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.SMALLINT,
Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT,
Types.SMALLINT, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR,
Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.SMALLINT, Types.VARCHAR };
}

protected Object[] getTriggerSqlValues(Trigger trigger) {
return new Object[] { trigger.getSourceCatalogName(), trigger.getSourceSchemaName(),
trigger.getSourceTableName(), trigger.getChannelId(), trigger.getReloadChannelId(),
trigger.isSyncOnUpdate() ? 1 : 0, trigger.isSyncOnInsert() ? 1 : 0, trigger.isSyncOnDelete() ? 1 : 0,
trigger.isSyncOnIncomingBatch() ? 1 : 0, trigger.isUseStreamLobs() ? 1 : 0,
trigger.isUseCaptureLobs() ? 1 : 0, trigger.isUseCaptureOldData() ? 1 : 0,
trigger.isUseHandleKeyUpdates() ? 1 : 0, trigger.getNameForUpdateTrigger(),
trigger.getNameForInsertTrigger(), trigger.getNameForDeleteTrigger(),
trigger.getSyncOnUpdateCondition(), trigger.getSyncOnInsertCondition(),
trigger.getSyncOnDeleteCondition(), trigger.getCustomBeforeUpdateText(),
trigger.getCustomBeforeInsertText(), trigger.getCustomBeforeDeleteText(),
trigger.getCustomOnUpdateText(), trigger.getCustomOnInsertText(), trigger.getCustomOnDeleteText(),
trigger.getTxIdExpression(), trigger.getExcludedColumnNames(), trigger.getIncludedColumnNames(),
trigger.getSyncKeyNames(), trigger.getCreateTime(), trigger.getLastUpdateBy(), trigger.getLastUpdateTime(),
trigger.getExternalSelect(), trigger.getChannelExpression(), trigger.isStreamRow(),
trigger.getTriggerId() };
}

public void syncTriggers() {
syncTriggers(false);
}
Expand Down

0 comments on commit 4178fe0

Please sign in to comment.