Skip to content

Commit

Permalink
0000942: Add a router type of 'audit' that captures changes in an dyn…
Browse files Browse the repository at this point in the history
…amically created audit table.
  • Loading branch information
chenson42 committed Dec 6, 2012
1 parent 4310474 commit 21fdeb5
Show file tree
Hide file tree
Showing 11 changed files with 171 additions and 10 deletions.
35 changes: 35 additions & 0 deletions symmetric-assemble/src/docbook/configuration.xml
Expand Up @@ -296,6 +296,8 @@ values
<para>A node group link can be configured to use the same node group as
the source and the target. This configuration allows a node group to sync
with every other node in its group.</para>
<para>A third type of link action of 'R' for 'Route Only' exists if you want to associate a router with a link that will not move the data.
This action type might be useful when using an XML publishing router or an audit table changes router.</para>
</section>

<section id="configuration-channel">
Expand Down Expand Up @@ -574,6 +576,10 @@ insert into SYM_TRIGGER
changes directly to a messaging solution instead of transmitting
changes to registered nodes. This router must be configured manually
in XML as an extension point.</listitem>

<listitem>Audit Table Router - a router that inserts into an automatically created audit table. It records captured changes
to tables that it is linked to.
</listitem>
</itemizedlist> The mapping between the set of triggers and set of
routers is many-to-many. This means that one trigger can capture changes
and route to multiple locations. It also means that one router can be
Expand Down Expand Up @@ -964,6 +970,35 @@ values
</programlisting></para>
</section>

<section id="configuration-audit-table-router">
<title>Audit Table Router</title>

<para>This router audits captured data by recording the change in an audit table
that the router creates and keeps up to date (as long as <code>auto.config.database</code> is
set to true.) The router creates a table named the same as the table for which
data was captured with the suffix of _AUDIT. It will contain all of the same columns
as the original table with the same data types only each column is nullable with no default
values. </para>

<para>Three extra "AUDIT" columns are added: <itemizedlist>
<listitem>AUDIT_ID - the primary key of the table.</listitem>
<listitem>AUDIT_TIME - the time at which the change occurred.</listitem>
<listitem>AUDIT_EVENT - the DML type that happened to the row.</listitem>
</itemizedlist> </para>

<para>The following is an example of an audit router<programlisting>

insert into SYM_ROUTER
(router_id, source_node_group_id, target_node_group_id, router_type,
create_time, last_update_time)
values
('audit_at_corp','corp', 'local', 'audit', current_timestamp, current_timestamp);
</programlisting></para>

<para>Because the audit router isn't capturing data for a specific node in the system, but it still has to be
associated with a node_group_link a new link action of type 'R' has been introduced. The 'R' stands for 'only routes to'</para>
</section>




Expand Down
Expand Up @@ -875,7 +875,7 @@ public ISequenceService getSequenceService() {

public INodeCommunicationService getNodeCommunicationService() {
return nodeCommunicationService;
}
}

private void removeMeFromMap(Map<String, ISymmetricEngine> map) {
Set<String> keys = new HashSet<String>(map.keySet());
Expand Down
Expand Up @@ -51,6 +51,7 @@
import org.jumpmind.symmetric.service.IRegistrationService;
import org.jumpmind.symmetric.service.IRouterService;
import org.jumpmind.symmetric.service.ISecurityService;
import org.jumpmind.symmetric.service.ISequenceService;
import org.jumpmind.symmetric.service.IStatisticService;
import org.jumpmind.symmetric.service.ITransformService;
import org.jumpmind.symmetric.service.ITriggerRouterService;
Expand Down Expand Up @@ -266,6 +267,8 @@ public interface ISymmetricEngine {

public INodeCommunicationService getNodeCommunicationService();

public ISequenceService getSequenceService();

public String getTablePrefix();

public Logger getLog();
Expand Down
Expand Up @@ -17,20 +17,15 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License. */


package org.jumpmind.symmetric.model;


/**
* Identifies the action to take when the event watcher sees events in the event
* table.
*
*
*/
public enum NodeGroupLinkAction {

P("pushes to"), W("waits for pull from");
P("pushes to"), W("waits for pull from"), R("only routes to");

private String description;

Expand All @@ -44,6 +39,8 @@ public static NodeGroupLinkAction fromCode(String code) {
return P;
} else if (W.name().equals(code)) {
return W;
} else if (R.name().equals(code)) {
return R;
}
}
return null;
Expand Down
Expand Up @@ -132,6 +132,8 @@ public String getTriggerNameForDmlType(DataEventType type) {
return getNameForUpdateTrigger();
case DELETE:
return getNameForDeleteTrigger();
default:
break;
}
throw new IllegalStateException();
}
Expand Down
@@ -0,0 +1,101 @@
package org.jumpmind.symmetric.route;

import java.sql.Types;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.DmlStatement;
import org.jumpmind.db.sql.DmlStatement.DmlType;
import org.jumpmind.db.sql.ISqlTemplate;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.service.IParameterService;

public class AuditTableDataRouter extends AbstractDataRouter {

private static final String COLUMN_AUDIT_EVENT = "AUDIT_EVENT";

private static final String COLUMN_AUDIT_TIME = "AUDIT_TIME";

private static final String COLUMN_AUDIT_ID = "AUDIT_ID";

private ISymmetricEngine engine;

private Map<String, Table> auditTables = new HashMap<String, Table>();

public AuditTableDataRouter(ISymmetricEngine engine) {
this.engine = engine;
}

public Set<String> routeToNodes(SimpleRouterContext context, DataMetaData dataMetaData,
Set<Node> nodes, boolean initialLoad) {
IParameterService parameterService = engine.getParameterService();
IDatabasePlatform platform = engine.getDatabasePlatform();
TriggerHistory triggerHistory = dataMetaData.getTriggerHistory();
Table table = dataMetaData.getTable().copyAndFilterColumns(
triggerHistory.getParsedColumnNames(), triggerHistory.getParsedPkColumnNames(),
true);
String tableName = table.getFullyQualifiedTableName();
Table auditTable = auditTables.get(tableName);
if (auditTable == null) {
auditTable = toAuditTable(table);
auditTables.put(tableName, auditTable);
if (parameterService.is(ParameterConstants.AUTO_CONFIGURE_DATABASE)) {
platform.alterTables(true, auditTable);
}
}
String auditTableName = auditTable.getFullyQualifiedTableName(platform.getDatabaseInfo()
.getDelimiterToken());

ISqlTemplate template = platform.getSqlTemplate();
Map<String, Object> values = new HashMap<String, Object>(getNewDataAsObject(null,
dataMetaData, engine.getSymmetricDialect()));
Long sequence = (Long)context.get(auditTableName);
if (sequence == null) {
sequence = 1l + template.queryForLong(String.format("select max(%s) from %s", COLUMN_AUDIT_ID,
auditTableName));
} else {
sequence = 1l + sequence;
}
context.put(auditTableName, sequence);
values.put(COLUMN_AUDIT_ID,
sequence);
values.put(COLUMN_AUDIT_TIME, new Date());
values.put(COLUMN_AUDIT_EVENT, dataMetaData.getData().getDataEventType().getCode());
DmlStatement statement = platform.createDmlStatement(DmlType.INSERT, auditTable);
int[] types = statement.getTypes();
Object[] args = statement.getValueArray(values);
String sql = statement.getSql();
template.update(sql, args, types);
return null;
}

protected Table toAuditTable(Table table) {
Table auditTable = table.copy();
String tableName = table.getName();
auditTable.setName(String.format("%s_AUDIT", tableName));
Column[] columns = auditTable.getColumns();
auditTable.removeAllColumns();
auditTable.addColumn(new Column(COLUMN_AUDIT_ID, true, Types.BIGINT, 0, 0));
auditTable.addColumn(new Column(COLUMN_AUDIT_TIME, false, Types.TIMESTAMP, 0, 0));
auditTable.addColumn(new Column(COLUMN_AUDIT_EVENT, false, Types.CHAR, 1, 0));
for (Column column : columns) {
column.setRequired(false);
column.setPrimaryKey(false);
auditTable.addColumn(column);
}
auditTable.removeAllForeignKeys();
auditTable.removeAllIndices();
engine.getDatabasePlatform().alterCaseToMatchDatabaseDefaultCase(auditTable);
return auditTable;
}

}
Expand Up @@ -48,6 +48,7 @@
import org.jumpmind.symmetric.model.OutgoingBatch.Status;
import org.jumpmind.symmetric.model.Router;
import org.jumpmind.symmetric.model.TriggerRouter;
import org.jumpmind.symmetric.route.AuditTableDataRouter;
import org.jumpmind.symmetric.route.BshDataRouter;
import org.jumpmind.symmetric.route.ChannelRouterContext;
import org.jumpmind.symmetric.route.ColumnMatchDataRouter;
Expand Down Expand Up @@ -100,6 +101,7 @@ public RouterService(ISymmetricEngine engine) {
this.routers.put("subselect", new SubSelectDataRouter(symmetricDialect));
this.routers.put("lookuptable", new LookupTableDataRouter(symmetricDialect));
this.routers.put("default", new DefaultDataRouter());
this.routers.put("audit", new AuditTableDataRouter(engine));
this.routers.put("column", new ColumnMatchDataRouter(engine.getConfigurationService(), engine.getSymmetricDialect()));

setSqlMap(new RouterServiceSqlMap(symmetricDialect.getPlatform(),
Expand Down
Expand Up @@ -3,7 +3,6 @@
import java.util.HashMap;
import java.util.Map;

import org.jumpmind.db.sql.AbstractSqlTemplate;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.Row;
Expand Down
2 changes: 1 addition & 1 deletion symmetric-core/src/main/resources/symmetric-schema.xml
Expand Up @@ -239,7 +239,7 @@
<column name="target_table_name" type="VARCHAR" size="255" description="Optional name for a target table. Only use this if the target table name is different than the source." />
<column name="source_node_group_id" type="VARCHAR" size="50" required="true" description="Routers with this node_group_id will install triggers that are mapped to this router." />
<column name="target_node_group_id" type="VARCHAR" size="50" required="true" description="The node_group_id for nodes to route data to. Note that routing can be further narrowed down by the configured router_type and router_expression." />
<column name="router_type" type="VARCHAR" size="50" description="The name of a specific type of router. Out of the box routers are 'default','column','bsh', and 'subselect.' Custom routers can be configured as extension points." />
<column name="router_type" type="VARCHAR" size="50" description="The name of a specific type of router. Out of the box routers are 'default','column','bsh', 'subselect' and 'audit.' Custom routers can be configured as extension points." />
<column name="router_expression" type="LONGVARCHAR" description="An expression that is specific to the type of router that is configured in router_type. See the documentation for each router for more details." />
<column name="sync_on_update" type="BOOLEANINT" size="1" required="true" default="1" description="Flag that indicates that this router should route updates." />
<column name="sync_on_insert" type="BOOLEANINT" size="1" required="true" default="1" description="Flag that indicates that this router should route inserts." />
Expand Down
Expand Up @@ -20,6 +20,8 @@
*/
package org.jumpmind.db.platform.postgresql;

import java.util.Map;

import org.apache.commons.lang.ArrayUtils;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.TypeMap;
Expand Down Expand Up @@ -76,6 +78,26 @@ public <T> T[] getValueArray(T[] columnValues, T[] keyValues) {
return super.getValueArray(columnValues, keyValues);
}
}

@Override
public Object[] getValueArray(Map<String, Object> params) {
Object[] args = null;
int index = 0;
if (params != null) {
if (dmlType == DmlType.INSERT) {
args = new Object[columns.length + keys.length];
for (Column column : columns) {
args[index++] = params.get(column.getName());
}
for (Column column : keys) {
args[index++] = params.get(column.getName());
}
} else {
args = super.getValueArray(params);
}
}
return args;
}

@Override
protected int[] buildTypes(Column[] keys, Column[] columns, boolean isDateOverrideToTimestamp) {
Expand Down
Expand Up @@ -326,7 +326,7 @@ public <T> T[] getValueArray(T[] columnValues, T[] keyValues) {
return null;
}

public Object[] buildArgsFrom(Map<String, Object> params) {
public Object[] getValueArray(Map<String, Object> params) {
Object[] args = null;
if (params != null) {
int index = 0;
Expand Down

0 comments on commit 21fdeb5

Please sign in to comment.