diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/db/hsqldb/HsqlDbDialect.java b/symmetric/src/main/java/org/jumpmind/symmetric/db/hsqldb/HsqlDbDialect.java index 0c4d686663..304633792f 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/db/hsqldb/HsqlDbDialect.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/db/hsqldb/HsqlDbDialect.java @@ -32,6 +32,14 @@ public class HsqlDbDialect extends AbstractDbDialect implements IDbDialect { static final String TRANSACTION_ID_FUNCTION_NAME = "fn_transaction_id"; static final String SYNC_TRIGGERS_DISABLED_USER_VARIABLE = ""; + + ThreadLocal syncEnabled = new ThreadLocal() { + @Override + protected Boolean initialValue() { + return Boolean.TRUE; + } + + }; protected void initForSpecificDialect() { } @@ -66,11 +74,17 @@ public boolean isBlobSyncSupported() { public boolean isClobSyncSupported() { return true; } + + public boolean isSyncEnabled() { + return syncEnabled.get(); + } public void disableSyncTriggers() { + syncEnabled.set(Boolean.FALSE); } public void enableSyncTriggers() { + syncEnabled.set(Boolean.TRUE); } public String getSyncTriggersExpression() { diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/db/hsqldb/HsqlDbTrigger.java b/symmetric/src/main/java/org/jumpmind/symmetric/db/hsqldb/HsqlDbTrigger.java index b24d2a33d7..387a2d7b03 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/db/hsqldb/HsqlDbTrigger.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/db/hsqldb/HsqlDbTrigger.java @@ -19,18 +19,128 @@ */ package org.jumpmind.symmetric.db.hsqldb; +import java.util.List; +import java.util.Map; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.hsqldb.Trigger; +import org.jumpmind.symmetric.SymmetricEngine; +import org.jumpmind.symmetric.common.Constants; +import org.jumpmind.symmetric.model.Data; +import org.jumpmind.symmetric.model.DataEventAction; +import org.jumpmind.symmetric.model.DataEventType; +import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.model.Trigger; +import org.jumpmind.symmetric.model.TriggerHistory; +import org.jumpmind.symmetric.service.IConfigurationService; +import org.jumpmind.symmetric.service.IDataService; +import org.jumpmind.symmetric.service.INodeService; -public class HsqlDbTrigger implements Trigger { +public class HsqlDbTrigger implements org.hsqldb.Trigger { static final Log logger = LogFactory.getLog(HsqlDbTrigger.class); - - public void fire(int type, String triggerName, String tableName, Object[] oldRow, - Object[] newRolw) { - logger.info("trigger " + triggerName + " fired for " + tableName); - + + public void fire(int type, String triggerName, String tableName, + Object[] oldRow, Object[] newRow) { + try { + logger.info("trigger " + triggerName + " fired for " + tableName); + SymmetricEngine engine = getEngine(triggerName); + IConfigurationService configService = getConfigurationService(engine); + IDataService dataService = getDataService(engine); + INodeService nodeService = getNodeService(engine); + TriggerHistory history = configService + .getHistoryRecordFor(getTriggerHistoryId(triggerName)); + Trigger trigger = configService.getTriggerById(history + .getTriggerId()); + HsqlDbDialect dialect = getDbDialect(engine); + if (trigger.isSyncOnIncomingBatch() || dialect.isSyncEnabled()) { + DataEventType eventType = getDataEventType(type); + Data data = new Data(trigger.getChannelId(), tableName, + eventType, formatRowData(eventType, oldRow, newRow), + formatPkRowData(eventType, oldRow, newRow), history); + + // select nodes from sym_node + + DataEventAction action = configService + .getDataEventActionsByGroupId(trigger + .getSourceGroupId(), trigger.getTargetGroupId()); + List nodes = null; + + if (action != null) { + switch (action) { + case PUSH: + //nodes = nodeService.findNodesToPushTo(); + break; + case WAIT_FOR_POLL: + //nodes = nodeService.findNodesToPull(); + } + } + if (nodes != null) { + dataService.insertDataEvent(data, nodes); + } + + } + } catch (RuntimeException ex) { + logger.error(ex, ex); + throw ex; + } + } + + private String formatRowData(DataEventType type, Object[] oldRow, + Object[] newRow) { + return null; + } + + private String formatPkRowData(DataEventType type, Object[] oldRow, + Object[] newRow) { + return null; + } + + private DataEventType getDataEventType(int type) { + switch (type) { + + case org.hsqldb.Trigger.INSERT_AFTER_ROW: + return DataEventType.INSERT; + case org.hsqldb.Trigger.UPDATE_AFTER_ROW: + return DataEventType.UPDATE; + case org.hsqldb.Trigger.DELETE_AFTER_ROW: + return DataEventType.DELETE; + default: + throw new IllegalStateException("Unexpected trigger type: " + type); + } + } + + private int getTriggerHistoryId(String triggerName) { + return Integer.parseInt(triggerName.substring(triggerName + .lastIndexOf("_") + 1)); + } + + private HsqlDbDialect getDbDialect(SymmetricEngine engine) { + return (HsqlDbDialect) engine.getApplicationContext().getBean( + Constants.DB_DIALECT); + } + + private IConfigurationService getConfigurationService(SymmetricEngine engine) { + return (IConfigurationService) engine.getApplicationContext().getBean( + Constants.CONFIG_SERVICE); + } + + private INodeService getNodeService(SymmetricEngine engine) { + return (INodeService) engine.getApplicationContext().getBean( + Constants.NODE_SERVICE); + } + + private IDataService getDataService(SymmetricEngine engine) { + return (IDataService) engine.getApplicationContext().getBean( + Constants.DATA_SERVICE); + } + + private SymmetricEngine getEngine(String triggerName) { + String minusTriggerId = triggerName.substring(0, triggerName + .lastIndexOf("_")); + String engineName = minusTriggerId.substring(minusTriggerId + .lastIndexOf("_") + 1); + return SymmetricEngine.findEngineByName(engineName.toLowerCase()); } } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java index 2b6e89313c..2fda1697ee 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java @@ -45,7 +45,7 @@ public interface IConfigurationService { public void initSystemChannels(); - public Map getDataEventActionsByGroupId(String groupId); + public DataEventAction getDataEventActionsByGroupId(String sourceGroupId, String targetGroupId); public Map> getTriggersByChannelFor( String configurationTypeId); diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java index 57bca6b94d..3b9669a29d 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java @@ -199,16 +199,12 @@ private boolean isSet(Object value) { } @SuppressWarnings("unchecked") - public Map getDataEventActionsByGroupId( - String nodeGroupId) { - Map results = (Map) jdbcTemplate - .queryForMap(selectDataEventActionsByIdSql, - new Object[] { nodeGroupId }); - Map retMap = new HashMap(); - for (String key : results.keySet()) { - retMap.put(key, DataEventAction.fromCode(results.get(key))); - } - return retMap; + public DataEventAction getDataEventActionsByGroupId(String sourceGroupId, String targetGroupId) { + String code = (String) jdbcTemplate + .queryForObject(selectDataEventActionsByIdSql, + new Object[] { sourceGroupId, targetGroupId }, String.class); + + return DataEventAction.fromCode(code); } @SuppressWarnings("unchecked") diff --git a/symmetric/src/main/resources/symmetric-services.xml b/symmetric/src/main/resources/symmetric-services.xml index 6d33726546..73996b3723 100644 --- a/symmetric/src/main/resources/symmetric-services.xml +++ b/symmetric/src/main/resources/symmetric-services.xml @@ -69,8 +69,8 @@ - select source_node_group_id, data_event_action from - ${sync.table.prefix}_node_group_link where source_node_group_id = ? + select data_event_action from + ${sync.table.prefix}_node_group_link where source_node_group_id = ? and target_node_group_id = ?