Skip to content

Commit

Permalink
Added support for wildcard table names in sym_trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Apr 11, 2012
1 parent c1439cf commit 60fa12a
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 84 deletions.
Expand Up @@ -22,17 +22,19 @@
import org.jumpmind.symmetric.model.DataEvent;

/**
*
* General purpose constants used by SymmetricDS
*/
final public class Constants {


private Constants() {
}

public static final String WILDCARD = "*";

public static final String STAGING_CATEGORY_OUTGOING = "outgoing";

public static final String STAGING_CATEGORY_INCOMING = "incoming";

private Constants() {
}

public static final String CLIENT_SPRING_XML = "classpath:/symmetric-client.xml";

public static final String SERVER_SPRING_XML = "classpath:/symmetric-server.xml";
Expand Down
Expand Up @@ -107,7 +107,7 @@ public TriggerHistory(Table table, Trigger trigger) {
public TriggerHistory(Table table, Trigger trigger, TriggerReBuildReason reason) {
this();
this.lastTriggerBuildReason = reason;
this.sourceTableName = trigger.getSourceTableName();
this.sourceTableName = trigger.getSourceTableName().contains("*") ? table.getName() : trigger.getSourceTableName();
this.columnNames = Table.getCommaDeliminatedColumns(trigger.orderColumnsForTable(table));
this.sourceSchemaName = trigger.getSourceSchemaName();
this.sourceCatalogName = trigger.getSourceCatalogName();
Expand Down
Expand Up @@ -118,7 +118,7 @@ public interface ITriggerRouterService {

public void inactivateTriggerHistory(TriggerHistory history);

public TriggerHistory getNewestTriggerHistoryForTrigger(String triggerId);
public TriggerHistory getNewestTriggerHistoryForTrigger(String triggerId, String catalogName, String schemaName, String tableName);

public TriggerHistory getTriggerHistory(int historyId);

Expand Down
Expand Up @@ -159,8 +159,9 @@ public void extractConfigurationStandalone(Node node, Writer writer, String... t

for (int i = triggerRouters.size() - 1; i >= 0; i--) {
TriggerRouter triggerRouter = triggerRouters.get(i);
TriggerHistory triggerHistory = triggerRouterService
.getNewestTriggerHistoryForTrigger(triggerRouter.getTrigger().getTriggerId());
TriggerHistory triggerHistory = triggerRouterService.getNewestTriggerHistoryForTrigger(
triggerRouter.getTrigger().getTriggerId(), null, null, triggerRouter
.getTrigger().getSourceTableName());
if (triggerHistory == null) {
Table table = symmetricDialect.getTable(triggerRouter.getTrigger(), false);
if (table == null) {
Expand All @@ -184,8 +185,8 @@ public void extractConfigurationStandalone(Node node, Writer writer, String... t

for (int i = 0; i < triggerRouters.size(); i++) {
TriggerRouter triggerRouter = triggerRouters.get(i);
TriggerHistory triggerHistory = triggerRouterService
.getNewestTriggerHistoryForTrigger(triggerRouter.getTrigger().getTriggerId());
TriggerHistory triggerHistory = triggerRouterService.getNewestTriggerHistoryForTrigger(
triggerRouter.getTrigger().getTriggerId(), null, null, null);
if (triggerHistory == null) {
triggerHistory = new TriggerHistory(symmetricDialect.getTable(
triggerRouter.getTrigger(), false), triggerRouter.getTrigger());
Expand All @@ -208,7 +209,8 @@ public void extractConfigurationStandalone(Node node, Writer writer, String... t
ExtractDataReader dataReader = new ExtractDataReader(this.symmetricDialect.getPlatform(),
source);

ProtocolDataWriter dataWriter = new ProtocolDataWriter(nodeService.findIdentityNodeId(), writer);
ProtocolDataWriter dataWriter = new ProtocolDataWriter(nodeService.findIdentityNodeId(),
writer);
DataProcessor processor = new DataProcessor(dataReader, dataWriter);
processor.process();

Expand Down Expand Up @@ -312,9 +314,12 @@ public void extract(Node targetNode, IOutgoingTransport targetTransport,

if (streamToFileEnabled) {
transformExtractWriter = createTransformDataWriter(identity, targetNode,
new StagingDataWriter(nodeService.findIdentityNodeId(), Constants.STAGING_CATEGORY_OUTGOING, stagingManager));
new StagingDataWriter(nodeService.findIdentityNodeId(),
Constants.STAGING_CATEGORY_OUTGOING, stagingManager));
} else {
transformExtractWriter = createTransformDataWriter(identity, targetNode,
transformExtractWriter = createTransformDataWriter(
identity,
targetNode,
new ProtocolDataWriter(nodeService.findIdentityNodeId(), targetTransport.open()));
}

Expand Down Expand Up @@ -391,7 +396,8 @@ public void extract(Node targetNode, IOutgoingTransport targetTransport,
if (extractedBatch != null) {
IDataReader dataReader = new ProtocolDataReader(extractedBatch);
if (dataWriter == null) {
dataWriter = new ProtocolDataWriter(nodeService.findIdentityNodeId(), targetTransport.open());
dataWriter = new ProtocolDataWriter(
nodeService.findIdentityNodeId(), targetTransport.open());
}
new DataProcessor(dataReader, dataWriter).process();
}
Expand Down Expand Up @@ -460,7 +466,8 @@ public boolean extractBatchRange(Writer writer, long startBatchId, long endBatch
IDataReader dataReader = new ExtractDataReader(symmetricDialect.getPlatform(),
new SelectFromSymDataSource(batch, targetNode));
new DataProcessor(dataReader, createTransformDataWriter(nodeService.findIdentity(),
targetNode, new ProtocolDataWriter(nodeService.findIdentityNodeId(), writer))).process();
targetNode, new ProtocolDataWriter(nodeService.findIdentityNodeId(), writer)))
.process();
foundBatch = true;

}
Expand Down Expand Up @@ -798,8 +805,11 @@ public SelectFromTableEvent(Node node, TriggerRouter triggerRouter,
TriggerHistory triggerHistory) {
this.node = node;
this.triggerRouter = triggerRouter;
Trigger trigger = triggerRouter.getTrigger();
this.triggerHistory = triggerHistory != null ? triggerHistory : triggerRouterService
.getNewestTriggerHistoryForTrigger(triggerRouter.getTrigger().getTriggerId());
.getNewestTriggerHistoryForTrigger(trigger.getTriggerId(),
trigger.getSourceCatalogName(), trigger.getSourceSchemaName(),
trigger.getSourceTableName());
}

public SelectFromTableEvent(Data data) {
Expand Down
Expand Up @@ -147,13 +147,14 @@ public void insertReloadEvent(final Node targetNode, final TriggerRouter trigger
}

private TriggerHistory lookupTriggerHistory(Trigger trigger) {
TriggerHistory history = triggerRouterService.getNewestTriggerHistoryForTrigger(trigger
.getTriggerId());
TriggerHistory history = triggerRouterService.getNewestTriggerHistoryForTrigger(trigger.getTriggerId(),
trigger.getSourceCatalogName(), trigger.getSourceSchemaName(),
trigger.getSourceTableName());

if (history == null) {
triggerRouterService.syncTriggers();
history = triggerRouterService
.getNewestTriggerHistoryForTrigger(trigger.getTriggerId());
.getNewestTriggerHistoryForTrigger(trigger.getTriggerId(), null, null, null);
}

if (history == null) {
Expand All @@ -166,8 +167,11 @@ private TriggerHistory lookupTriggerHistory(Trigger trigger) {
public void insertPurgeEvent(final Node targetNode, final TriggerRouter triggerRouter,
boolean isLoad) {
String sql = symmetricDialect.createPurgeSqlFor(targetNode, triggerRouter);
Trigger trigger = triggerRouter.getTrigger();
TriggerHistory history = triggerRouterService
.getNewestTriggerHistoryForTrigger(triggerRouter.getTrigger().getTriggerId());
.getNewestTriggerHistoryForTrigger(trigger.getTriggerId(),
trigger.getSourceCatalogName(), trigger.getSourceSchemaName(),
trigger.getSourceTableName());
Data data = new Data(history.getSourceTableName(), DataEventType.SQL,
CsvUtils.escapeCsvData(sql), null, history, triggerRouter.getTrigger()
.getChannelId(), null, null);
Expand All @@ -177,8 +181,9 @@ public void insertPurgeEvent(final Node targetNode, final TriggerRouter triggerR

public void insertSqlEvent(final Node targetNode, final Trigger trigger, String sql,
boolean isLoad) {
TriggerHistory history = triggerRouterService.getNewestTriggerHistoryForTrigger(trigger
.getTriggerId());
TriggerHistory history = triggerRouterService.getNewestTriggerHistoryForTrigger(trigger.getTriggerId(),
trigger.getSourceCatalogName(), trigger.getSourceSchemaName(),
trigger.getSourceTableName());
Data data = new Data(history.getSourceTableName(), DataEventType.SQL,
CsvUtils.escapeCsvData(sql), null, history, trigger.getChannelId(), null, null);
insertDataAndDataEventAndOutgoingBatch(data, targetNode.getNodeId(),
Expand Down Expand Up @@ -221,8 +226,11 @@ public void checkForAndUpdateMissingChannelIds(long firstDataId, long lastDataId

public void insertCreateEvent(final Node targetNode, final TriggerRouter triggerRouter,
String xml, boolean isLoad) {
Trigger trigger = triggerRouter.getTrigger();
TriggerHistory history = triggerRouterService
.getNewestTriggerHistoryForTrigger(triggerRouter.getTrigger().getTriggerId());
.getNewestTriggerHistoryForTrigger(trigger.getTriggerId(),
trigger.getSourceCatalogName(), trigger.getSourceSchemaName(),
trigger.getSourceTableName());
Data data = new Data(
triggerRouter.getTrigger().getSourceTableName(),
DataEventType.CREATE,
Expand Down Expand Up @@ -569,7 +577,9 @@ public Data createData(Trigger trigger, String whereClause) {
Data data = null;
if (trigger != null) {
TriggerHistory triggerHistory = triggerRouterService
.getNewestTriggerHistoryForTrigger(trigger.getTriggerId());
.getNewestTriggerHistoryForTrigger(trigger.getTriggerId(),
trigger.getSourceCatalogName(), trigger.getSourceSchemaName(),
trigger.getSourceTableName());
if (triggerHistory == null) {
triggerHistory = triggerRouterService.findTriggerHistory(trigger
.getSourceTableName());
Expand Down

0 comments on commit 60fa12a

Please sign in to comment.