diff --git a/symmetric-assemble/src/docbook/configuration.xml b/symmetric-assemble/src/docbook/configuration.xml index e8cd346472..8b807af97c 100644 --- a/symmetric-assemble/src/docbook/configuration.xml +++ b/symmetric-assemble/src/docbook/configuration.xml @@ -296,6 +296,8 @@ values A node group link can be configured to use the same node group as the source and the target. This configuration allows a node group to sync with every other node in its group. + A third type of link action of 'R' for 'Route Only' exists if you want to associate a router with a link that will not move the data. + This action type might be useful when using an XML publishing router or an audit table changes router.
@@ -574,6 +576,10 @@ insert into SYM_TRIGGER changes directly to a messaging solution instead of transmitting changes to registered nodes. This router must be configured manually in XML as an extension point. + + Audit Table Router - a router that inserts into an automatically created audit table. It records captured changes + to tables that it is linked to. + The mapping between the set of triggers and set of routers is many-to-many. This means that one trigger can capture changes and route to multiple locations. It also means that one router can be @@ -964,6 +970,35 @@ values
+
+ Audit Table Router + + This router audits captured data by recording the change in an audit table + that the router creates and keeps up to date (as long as auto.config.database is + set to true.) The router creates a table named the same as the table for which + data was captured with the suffix of _AUDIT. It will contain all of the same columns + as the original table with the same data types only each column is nullable with no default + values. + + Three extra "AUDIT" columns are added: + AUDIT_ID - the primary key of the table. + AUDIT_TIME - the time at which the change occurred. + AUDIT_EVENT - the DML type that happened to the row. + + + The following is an example of an audit router + +insert into SYM_ROUTER + (router_id, source_node_group_id, target_node_group_id, router_type, + create_time, last_update_time) +values + ('audit_at_corp','corp', 'local', 'audit', current_timestamp, current_timestamp); + + + Because the audit router isn't capturing data for a specific node in the system, but it still has to be + associated with a node_group_link a new link action of type 'R' has been introduced. The 'R' stands for 'only routes to' +
+ diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java index 7b5759c3e1..eee13a6f59 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java @@ -875,7 +875,7 @@ public ISequenceService getSequenceService() { public INodeCommunicationService getNodeCommunicationService() { return nodeCommunicationService; - } + } private void removeMeFromMap(Map map) { Set keys = new HashSet(map.keySet()); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java index c76949359f..50453d92ec 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java @@ -51,6 +51,7 @@ import org.jumpmind.symmetric.service.IRegistrationService; import org.jumpmind.symmetric.service.IRouterService; import org.jumpmind.symmetric.service.ISecurityService; +import org.jumpmind.symmetric.service.ISequenceService; import org.jumpmind.symmetric.service.IStatisticService; import org.jumpmind.symmetric.service.ITransformService; import org.jumpmind.symmetric.service.ITriggerRouterService; @@ -266,6 +267,8 @@ public interface ISymmetricEngine { public INodeCommunicationService getNodeCommunicationService(); + public ISequenceService getSequenceService(); + public String getTablePrefix(); public Logger getLog(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NodeGroupLinkAction.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NodeGroupLinkAction.java index 431be2ab70..6b455b1d6c 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NodeGroupLinkAction.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NodeGroupLinkAction.java @@ -17,20 +17,15 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ - - package org.jumpmind.symmetric.model; - /** * Identifies the action to take when the event watcher sees events in the event * table. - * - * */ public enum NodeGroupLinkAction { - P("pushes to"), W("waits for pull from"); + P("pushes to"), W("waits for pull from"), R("only routes to"); private String description; @@ -44,6 +39,8 @@ public static NodeGroupLinkAction fromCode(String code) { return P; } else if (W.name().equals(code)) { return W; + } else if (R.name().equals(code)) { + return R; } } return null; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TriggerHistory.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TriggerHistory.java index 4c4cfebb51..da5590f640 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TriggerHistory.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TriggerHistory.java @@ -132,6 +132,8 @@ public String getTriggerNameForDmlType(DataEventType type) { return getNameForUpdateTrigger(); case DELETE: return getNameForDeleteTrigger(); + default: + break; } throw new IllegalStateException(); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/AuditTableDataRouter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/AuditTableDataRouter.java new file mode 100644 index 0000000000..b55062c0d3 --- /dev/null +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/AuditTableDataRouter.java @@ -0,0 +1,101 @@ +package org.jumpmind.symmetric.route; + +import java.sql.Types; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.jumpmind.db.model.Column; +import org.jumpmind.db.model.Table; +import org.jumpmind.db.platform.IDatabasePlatform; +import org.jumpmind.db.sql.DmlStatement; +import org.jumpmind.db.sql.DmlStatement.DmlType; +import org.jumpmind.db.sql.ISqlTemplate; +import org.jumpmind.symmetric.ISymmetricEngine; +import org.jumpmind.symmetric.common.ParameterConstants; +import org.jumpmind.symmetric.model.DataMetaData; +import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.model.TriggerHistory; +import org.jumpmind.symmetric.service.IParameterService; + +public class AuditTableDataRouter extends AbstractDataRouter { + + private static final String COLUMN_AUDIT_EVENT = "AUDIT_EVENT"; + + private static final String COLUMN_AUDIT_TIME = "AUDIT_TIME"; + + private static final String COLUMN_AUDIT_ID = "AUDIT_ID"; + + private ISymmetricEngine engine; + + private Map auditTables = new HashMap(); + + public AuditTableDataRouter(ISymmetricEngine engine) { + this.engine = engine; + } + + public Set routeToNodes(SimpleRouterContext context, DataMetaData dataMetaData, + Set nodes, boolean initialLoad) { + IParameterService parameterService = engine.getParameterService(); + IDatabasePlatform platform = engine.getDatabasePlatform(); + TriggerHistory triggerHistory = dataMetaData.getTriggerHistory(); + Table table = dataMetaData.getTable().copyAndFilterColumns( + triggerHistory.getParsedColumnNames(), triggerHistory.getParsedPkColumnNames(), + true); + String tableName = table.getFullyQualifiedTableName(); + Table auditTable = auditTables.get(tableName); + if (auditTable == null) { + auditTable = toAuditTable(table); + auditTables.put(tableName, auditTable); + if (parameterService.is(ParameterConstants.AUTO_CONFIGURE_DATABASE)) { + platform.alterTables(true, auditTable); + } + } + String auditTableName = auditTable.getFullyQualifiedTableName(platform.getDatabaseInfo() + .getDelimiterToken()); + + ISqlTemplate template = platform.getSqlTemplate(); + Map values = new HashMap(getNewDataAsObject(null, + dataMetaData, engine.getSymmetricDialect())); + Long sequence = (Long)context.get(auditTableName); + if (sequence == null) { + sequence = 1l + template.queryForLong(String.format("select max(%s) from %s", COLUMN_AUDIT_ID, + auditTableName)); + } else { + sequence = 1l + sequence; + } + context.put(auditTableName, sequence); + values.put(COLUMN_AUDIT_ID, + sequence); + values.put(COLUMN_AUDIT_TIME, new Date()); + values.put(COLUMN_AUDIT_EVENT, dataMetaData.getData().getDataEventType().getCode()); + DmlStatement statement = platform.createDmlStatement(DmlType.INSERT, auditTable); + int[] types = statement.getTypes(); + Object[] args = statement.getValueArray(values); + String sql = statement.getSql(); + template.update(sql, args, types); + return null; + } + + protected Table toAuditTable(Table table) { + Table auditTable = table.copy(); + String tableName = table.getName(); + auditTable.setName(String.format("%s_AUDIT", tableName)); + Column[] columns = auditTable.getColumns(); + auditTable.removeAllColumns(); + auditTable.addColumn(new Column(COLUMN_AUDIT_ID, true, Types.BIGINT, 0, 0)); + auditTable.addColumn(new Column(COLUMN_AUDIT_TIME, false, Types.TIMESTAMP, 0, 0)); + auditTable.addColumn(new Column(COLUMN_AUDIT_EVENT, false, Types.CHAR, 1, 0)); + for (Column column : columns) { + column.setRequired(false); + column.setPrimaryKey(false); + auditTable.addColumn(column); + } + auditTable.removeAllForeignKeys(); + auditTable.removeAllIndices(); + engine.getDatabasePlatform().alterCaseToMatchDatabaseDefaultCase(auditTable); + return auditTable; + } + +} diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java index a05860fe2d..0dea8aabdc 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java @@ -48,6 +48,7 @@ import org.jumpmind.symmetric.model.OutgoingBatch.Status; import org.jumpmind.symmetric.model.Router; import org.jumpmind.symmetric.model.TriggerRouter; +import org.jumpmind.symmetric.route.AuditTableDataRouter; import org.jumpmind.symmetric.route.BshDataRouter; import org.jumpmind.symmetric.route.ChannelRouterContext; import org.jumpmind.symmetric.route.ColumnMatchDataRouter; @@ -100,6 +101,7 @@ public RouterService(ISymmetricEngine engine) { this.routers.put("subselect", new SubSelectDataRouter(symmetricDialect)); this.routers.put("lookuptable", new LookupTableDataRouter(symmetricDialect)); this.routers.put("default", new DefaultDataRouter()); + this.routers.put("audit", new AuditTableDataRouter(engine)); this.routers.put("column", new ColumnMatchDataRouter(engine.getConfigurationService(), engine.getSymmetricDialect())); setSqlMap(new RouterServiceSqlMap(symmetricDialect.getPlatform(), diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/SequenceService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/SequenceService.java index 797ae95369..18584efc8c 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/SequenceService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/SequenceService.java @@ -3,7 +3,6 @@ import java.util.HashMap; import java.util.Map; -import org.jumpmind.db.sql.AbstractSqlTemplate; import org.jumpmind.db.sql.ISqlRowMapper; import org.jumpmind.db.sql.ISqlTransaction; import org.jumpmind.db.sql.Row; diff --git a/symmetric-core/src/main/resources/symmetric-schema.xml b/symmetric-core/src/main/resources/symmetric-schema.xml index 28e1564ca7..24db608d20 100644 --- a/symmetric-core/src/main/resources/symmetric-schema.xml +++ b/symmetric-core/src/main/resources/symmetric-schema.xml @@ -239,7 +239,7 @@ - + diff --git a/symmetric-db/src/main/java/org/jumpmind/db/platform/postgresql/PostgreSqlDmlStatement.java b/symmetric-db/src/main/java/org/jumpmind/db/platform/postgresql/PostgreSqlDmlStatement.java index 78013c4a4a..00e56b3a5a 100644 --- a/symmetric-db/src/main/java/org/jumpmind/db/platform/postgresql/PostgreSqlDmlStatement.java +++ b/symmetric-db/src/main/java/org/jumpmind/db/platform/postgresql/PostgreSqlDmlStatement.java @@ -20,6 +20,8 @@ */ package org.jumpmind.db.platform.postgresql; +import java.util.Map; + import org.apache.commons.lang.ArrayUtils; import org.jumpmind.db.model.Column; import org.jumpmind.db.model.TypeMap; @@ -76,6 +78,26 @@ public T[] getValueArray(T[] columnValues, T[] keyValues) { return super.getValueArray(columnValues, keyValues); } } + + @Override + public Object[] getValueArray(Map params) { + Object[] args = null; + int index = 0; + if (params != null) { + if (dmlType == DmlType.INSERT) { + args = new Object[columns.length + keys.length]; + for (Column column : columns) { + args[index++] = params.get(column.getName()); + } + for (Column column : keys) { + args[index++] = params.get(column.getName()); + } + } else { + args = super.getValueArray(params); + } + } + return args; + } @Override protected int[] buildTypes(Column[] keys, Column[] columns, boolean isDateOverrideToTimestamp) { diff --git a/symmetric-db/src/main/java/org/jumpmind/db/sql/DmlStatement.java b/symmetric-db/src/main/java/org/jumpmind/db/sql/DmlStatement.java index 3c69028e64..e3066820f4 100644 --- a/symmetric-db/src/main/java/org/jumpmind/db/sql/DmlStatement.java +++ b/symmetric-db/src/main/java/org/jumpmind/db/sql/DmlStatement.java @@ -326,7 +326,7 @@ public T[] getValueArray(T[] columnValues, T[] keyValues) { return null; } - public Object[] buildArgsFrom(Map params) { + public Object[] getValueArray(Map params) { Object[] args = null; if (params != null) { int index = 0;