Skip to content

Commit

Permalink
0001748: Remove auto increment from sym_trigger_hist and sym_extract_…
Browse files Browse the repository at this point in the history
…request. Use sym_sequence to get the pks.
  • Loading branch information
chenson42 committed Jun 5, 2014
1 parent 6cad0ea commit 3e884ec
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 55 deletions.
Expand Up @@ -297,10 +297,8 @@ protected void init() {
this.dataService = new DataService(this);
this.routerService = buildRouterService();
this.nodeCommunicationService = buildNodeCommunicationService(clusterService, nodeService, parameterService, symmetricDialect);
this.dataExtractorService = new DataExtractorService(parameterService, symmetricDialect,
outgoingBatchService, routerService, configurationService, triggerRouterService,
nodeService, dataService, transformService, statisticManager, stagingManager, clusterService, nodeCommunicationService);
this.incomingBatchService = new IncomingBatchService(parameterService, symmetricDialect, clusterService);
this.dataExtractorService = new DataExtractorService(this);
this.transportManager = new TransportManagerFactory(this).create();
this.dataLoaderService = new DataLoaderService(this);
this.registrationService = new RegistrationService(parameterService, symmetricDialect,
Expand Down
Expand Up @@ -140,6 +140,10 @@ private Constants() {
public static final String TRANSPORT_HTTPS_VERIFIED_SERVERS_ALL="all";

public static final String TRANSFORM_SERVICE = "transformService";

public static final String SEQUENCE_TRIGGER_HIST = TableConstants.SYM_TRIGGER_HIST;

public static final String SEQUENCE_EXTRACT_REQ = TableConstants.SYM_EXTRACT_REQUEST;

public static final String SEQUENCE_OUTGOING_BATCH = TableConstants.SYM_OUTGOING_BATCH;

Expand Down
Expand Up @@ -47,13 +47,12 @@
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.Row;
import org.jumpmind.db.sql.SqlConstants;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.Version;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.common.TableConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.db.SequenceIdentifier;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.Batch.BatchType;
import org.jumpmind.symmetric.io.data.CsvData;
Expand Down Expand Up @@ -112,8 +111,8 @@
import org.jumpmind.symmetric.service.INodeCommunicationService.INodeCommunicationExecutor;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IOutgoingBatchService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.IRouterService;
import org.jumpmind.symmetric.service.ISequenceService;
import org.jumpmind.symmetric.service.ITransformService;
import org.jumpmind.symmetric.service.ITriggerRouterService;
import org.jumpmind.symmetric.service.impl.TransformService.TransformTableNodeGroupLink;
Expand Down Expand Up @@ -141,6 +140,8 @@ protected enum ExtractMode { FOR_SYM_CLIENT, FOR_PAYLOAD_CLIENT, EXTRACT_ONLY };
private ITriggerRouterService triggerRouterService;

private ITransformService transformService;

private ISequenceService sequenceService;

private IDataService dataService;

Expand All @@ -156,26 +157,21 @@ protected enum ExtractMode { FOR_SYM_CLIENT, FOR_PAYLOAD_CLIENT, EXTRACT_ONLY };

private Map<String, Semaphore> locks = new HashMap<String, Semaphore>();

public DataExtractorService(IParameterService parameterService,
ISymmetricDialect symmetricDialect, IOutgoingBatchService outgoingBatchService,
IRouterService routingService, IConfigurationService configurationService,
ITriggerRouterService triggerRouterService, INodeService nodeService,
IDataService dataService, ITransformService transformService,
IStatisticManager statisticManager, IStagingManager stagingManager,
IClusterService clusterService, INodeCommunicationService nodeCommunicationService) {
super(parameterService, symmetricDialect);
this.outgoingBatchService = outgoingBatchService;
this.routerService = routingService;
this.dataService = dataService;
this.configurationService = configurationService;
this.triggerRouterService = triggerRouterService;
this.nodeService = nodeService;
this.transformService = transformService;
this.statisticManager = statisticManager;
this.stagingManager = stagingManager;
this.nodeCommunicationService = nodeCommunicationService;
this.clusterService = clusterService;
setSqlMap(new DataExtractorSqlMap(symmetricDialect.getPlatform(),
public DataExtractorService(ISymmetricEngine engine) {
super(engine.getParameterService(), engine.getSymmetricDialect());
this.outgoingBatchService = engine.getOutgoingBatchService();
this.routerService = engine.getRouterService();
this.dataService = engine.getDataService();
this.configurationService = engine.getConfigurationService();
this.triggerRouterService = engine.getTriggerRouterService();
this.nodeService = engine.getNodeService();
this.transformService = engine.getTransformService();
this.statisticManager = engine.getStatisticManager();
this.stagingManager = engine.getStagingManager();
this.nodeCommunicationService = engine.getNodeCommunicationService();
this.clusterService = engine.getClusterService();
this.sequenceService = engine.getSequenceService();
setSqlMap(new DataExtractorServiceSqlMap(symmetricDialect.getPlatform(),
createSqlReplacementTokens()));
}

Expand Down Expand Up @@ -996,13 +992,11 @@ protected void resetExtractRequest(OutgoingBatch batch) {

public void requestExtractRequest(ISqlTransaction transaction, String nodeId,
TriggerRouter triggerRouter, long startBatchId, long endBatchId) {
transaction.insertWithGeneratedKey(
getSql("insertExtractRequestSql"),
symmetricDialect.getSequenceKeyName(SequenceIdentifier.REQUEST),
symmetricDialect.getSequenceName(SequenceIdentifier.REQUEST), new Object[] {
nodeId, ExtractStatus.NE.name(), startBatchId, endBatchId,
triggerRouter.getTrigger().getTriggerId(),
triggerRouter.getRouter().getRouterId() }, new int[] { Types.VARCHAR,
long requestId = sequenceService.nextVal(transaction, Constants.SEQUENCE_EXTRACT_REQ);
transaction.prepareAndExecute(getSql("insertExtractRequestSql"),
new Object[] { requestId, nodeId, ExtractStatus.NE.name(), startBatchId,
endBatchId, triggerRouter.getTrigger().getTriggerId(),
triggerRouter.getRouter().getRouterId() }, new int[] { Types.BIGINT, Types.VARCHAR,
Types.VARCHAR, Types.BIGINT, Types.BIGINT, Types.VARCHAR, Types.VARCHAR });
}

Expand Down
Expand Up @@ -4,9 +4,9 @@

import org.jumpmind.db.platform.IDatabasePlatform;

public class DataExtractorSqlMap extends AbstractSqlMap {
public class DataExtractorServiceSqlMap extends AbstractSqlMap {

public DataExtractorSqlMap(IDatabasePlatform platform,
public DataExtractorServiceSqlMap(IDatabasePlatform platform,
Map<String, String> replacementTokens) {
super(platform, replacementTokens);

Expand All @@ -15,7 +15,7 @@ public DataExtractorSqlMap(IDatabasePlatform platform,

putSql("selectExtractRequestForNodeSql", "select * from $(extract_request) where node_id=? and status=? order by request_id");

putSql("insertExtractRequestSql", "insert into $(extract_request) (request_id, node_id, status, start_batch_id, end_batch_id, trigger_id, router_id, last_update_time, create_time) values(null, ?, ?, ?, ?, ?, ?, current_timestamp, current_timestamp)");
putSql("insertExtractRequestSql", "insert into $(extract_request) (request_id, node_id, status, start_batch_id, end_batch_id, trigger_id, router_id, last_update_time, create_time) values(?, ?, ?, ?, ?, ?, ?, current_timestamp, current_timestamp)");

putSql("updateExtractRequestStatus", "update $(extract_request) set status=? where request_id=?");

Expand Down
Expand Up @@ -48,26 +48,29 @@ public SequenceService(IParameterService parameterService, ISymmetricDialect sym
}

public void init() {
initSequence(Constants.SEQUENCE_OUTGOING_BATCH_LOAD_ID, 1);

long maxBatchId = sqlTemplate.queryForLong(getSql("maxOutgoingBatchSql"));
if (maxBatchId < 1) {
maxBatchId = 1;
}
try {
create(new Sequence(Constants.SEQUENCE_OUTGOING_BATCH, maxBatchId, 1, 1, 9999999999l,
"system", false));
} catch (UniqueKeyException ex) {
log.debug("Failed to create sequence {}. Must be initialized already.",
Constants.SEQUENCE_OUTGOING_BATCH);
}
initSequence(Constants.SEQUENCE_OUTGOING_BATCH, maxBatchId);

long maxTriggerHistId = sqlTemplate.queryForLong(getSql("maxTriggerHistSql"));
initSequence(Constants.SEQUENCE_TRIGGER_HIST, maxTriggerHistId);

long maxRequestId = sqlTemplate.queryForLong(getSql("maxExtractRequestSql"));
initSequence(Constants.SEQUENCE_EXTRACT_REQ, maxRequestId);
}

private void initSequence(String name, long initialValue) {
try {
create(new Sequence(Constants.SEQUENCE_OUTGOING_BATCH_LOAD_ID, 1, 1, 1, 9999999999l,
if (initialValue < 1) {
initialValue = 1;
}
create(new Sequence(name, initialValue, 1, 1, 9999999999l,
"system", false));
} catch (UniqueKeyException ex) {
log.debug("Failed to create sequence {}. Must be initialized already.",
Constants.SEQUENCE_OUTGOING_BATCH_LOAD_ID);
name);
}

}

public long nextVal(String name) {
Expand Down
Expand Up @@ -48,6 +48,11 @@ public SequenceServiceSqlMap(IDatabasePlatform platform, Map<String, String> rep
" values(?,?,?,?,?,?,current_timestamp,?,current_timestamp) ");

putSql("maxOutgoingBatchSql", "select max(batch_id)+1 from $(outgoing_batch)");

putSql("maxTriggerHistSql", "select max(trigger_hist_id)+1 from $(trigger_hist)");

putSql("maxExtractRequestSql", "select max(request_id)+1 from $(extract_request)");


// @formatter:on
}
Expand Down
Expand Up @@ -63,6 +63,7 @@
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.IGroupletService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.ISequenceService;
import org.jumpmind.symmetric.service.ITriggerRouterService;
import org.jumpmind.symmetric.statistic.IStatisticManager;
import org.jumpmind.util.FormatUtils;
Expand All @@ -75,6 +76,8 @@ public class TriggerRouterService extends AbstractService implements ITriggerRou
private IClusterService clusterService;

private IConfigurationService configurationService;

private ISequenceService sequenceService;

private Map<String, Router> routersCache;

Expand Down Expand Up @@ -121,6 +124,7 @@ public TriggerRouterService(ISymmetricEngine engine) {
this.statisticManager = engine.getStatisticManager();
this.groupletService = engine.getGroupletService();
this.nodeService = engine.getNodeService();
this.sequenceService = engine.getSequenceService();
this.addTriggerCreationListeners(this.failureListener);
setSqlMap(new TriggerRouterServiceSqlMap(symmetricDialect.getPlatform(),
createSqlReplacementTokens()));
Expand Down Expand Up @@ -818,9 +822,10 @@ public TriggerRouter mapRow(Row rs) {
}

public void insert(TriggerHistory newHistRecord) {
newHistRecord.setTriggerHistoryId((int)sequenceService.nextVal(Constants.SEQUENCE_TRIGGER_HIST));
sqlTemplate.update(
getSql("insertTriggerHistorySql"),
new Object[] { newHistRecord.getTriggerId(), newHistRecord.getSourceTableName(),
new Object[] { newHistRecord.getTriggerHistoryId(), newHistRecord.getTriggerId(), newHistRecord.getSourceTableName(),
newHistRecord.getTableHash(), newHistRecord.getCreateTime(),
newHistRecord.getColumnNames(), newHistRecord.getPkColumnNames(),
newHistRecord.getLastTriggerBuildReason().getCode(),
Expand All @@ -830,7 +835,7 @@ public void insert(TriggerHistory newHistRecord) {
newHistRecord.getSourceSchemaName(), newHistRecord.getSourceCatalogName(),
newHistRecord.getTriggerRowHash(), newHistRecord.getTriggerTemplateHash(),
newHistRecord.getErrorMessage() },
new int[] { Types.VARCHAR, Types.VARCHAR, Types.BIGINT, Types.TIMESTAMP,
new int[] { Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.BIGINT, Types.TIMESTAMP,
Types.VARCHAR, Types.VARCHAR, Types.CHAR, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.BIGINT, Types.BIGINT,
Types.VARCHAR });
Expand Down
Expand Up @@ -121,8 +121,8 @@ public TriggerRouterServiceSqlMap(IDatabasePlatform platform,
putSql("insertTriggerHistorySql",
""
+ "insert into $(trigger_hist) "
+ " (trigger_id,source_table_name,table_hash,create_time,column_names,pk_column_names,last_trigger_build_reason,name_for_delete_trigger,name_for_insert_trigger,name_for_update_trigger,source_schema_name,source_catalog_name,trigger_row_hash,trigger_template_hash,error_message) "
+ " values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ");
+ " (trigger_hist_id, trigger_id,source_table_name,table_hash,create_time,column_names,pk_column_names,last_trigger_build_reason,name_for_delete_trigger,name_for_insert_trigger,name_for_update_trigger,source_schema_name,source_catalog_name,trigger_row_hash,trigger_template_hash,error_message) "
+ " values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ");

putSql("deleteTriggerSql", "" + "delete from $(trigger) where trigger_id=? ");

Expand Down
4 changes: 2 additions & 2 deletions symmetric-core/src/main/resources/symmetric-schema.xml
Expand Up @@ -113,7 +113,7 @@
</table>

<table name="extract_request" description="This table is used internally to request the extract of initial loads asynchronously when the initial load extract job is enabled.">
<column name="request_id" type="BIGINT" required="true" primaryKey="true" autoIncrement="true" description="Unique identifier for a request." />
<column name="request_id" type="BIGINT" required="true" primaryKey="true" description="Unique identifier for a request." />
<column name="node_id" type="VARCHAR" size="50" required="true" description="The node_id of the batch being loaded." />
<column name="status" type="CHAR" size="2" description="NE, OK" />
<column name="start_batch_id" type="BIGINT" required="true" description="A load can be split across multiple batches. This is the first of N batches the load will be split across." />
Expand Down Expand Up @@ -708,7 +708,7 @@
</table>

<table name="trigger_hist" description="A history of a table's definition and the trigger used to capture data from the table. When a database trigger captures a data change, it references a trigger_hist entry so it is possible to know which columns the data represents. trigger_hist entries are made during the sync trigger process, which runs at each startup, each night in the syncTriggersJob, or any time the syncTriggers() JMX method is manually invoked. A new entry is made when a table definition or a trigger definition is changed, which causes a database trigger to be created or rebuilt.">
<column name="trigger_hist_id" type="INTEGER" required="true" primaryKey="true" autoIncrement="true" description="Unique identifier for a trigger_hist entry" />
<column name="trigger_hist_id" type="INTEGER" required="true" primaryKey="true" description="Unique identifier for a trigger_hist entry" />
<column name="trigger_id" type="VARCHAR" size="128" required="true" description="Unique identifier for a trigger" />
<column name="source_table_name" type="VARCHAR" size="255" required="true" description="The name of the source table that will have a trigger installed to watch for data changes." />
<column name="source_catalog_name" type="VARCHAR" size="255" description="The catalog name where the source table resides." />
Expand Down

0 comments on commit 3e884ec

Please sign in to comment.