Skip to content

Commit

Permalink
0001920: Redshift database dialect - fix reloads from redshift source
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Aug 25, 2014
1 parent 31187c6 commit f5d2885
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 33 deletions.
Expand Up @@ -33,6 +33,7 @@ public class RedshiftSymmetricDialect extends AbstractSymmetricDialect implement

public RedshiftSymmetricDialect(IParameterService parameterService, IDatabasePlatform platform) {
super(parameterService, platform);
triggerTemplate = new RedshiftTriggerTemplate(this);
}

@Override
Expand Down
@@ -0,0 +1,51 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.db.redshift;

import java.util.HashMap;

import org.jumpmind.symmetric.db.AbstractTriggerTemplate;
import org.jumpmind.symmetric.db.ISymmetricDialect;

public class RedshiftTriggerTemplate extends AbstractTriggerTemplate {

protected RedshiftTriggerTemplate(ISymmetricDialect symmetricDialect) {
super(symmetricDialect);

emptyColumnTemplate = "''";
stringColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else '\"' || replace(replace($(tableAlias).\"$(columnName)\",$$\\$$,$$\\\\$$),'\"',$$\\\"$$) || '\"' end";
numberColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else '\"' || cast($(tableAlias).\"$(columnName)\" as varchar) || '\"' end";
datetimeColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else '\"' || to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.MS') || '\"' end";
booleanColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' when $(tableAlias).\"$(columnName)\" then '\"1\"' else '\"0\"' end";
triggerConcatCharacter = "||";
newTriggerValue = "new";
oldTriggerValue = "old";
oldColumnPrefix = "";
newColumnPrefix = "";

sqlTemplates = new HashMap<String,String>();
sqlTemplates.put("insertTriggerTemplate", "");
sqlTemplates.put("updateTriggerTemplate", "");
sqlTemplates.put("deleteTriggerTemplate", "");
sqlTemplates.put("initialLoadSqlTemplate", "select $(columns) from $(schemaName)$(tableName) t where $(whereClause)");
}

}
Expand Up @@ -75,23 +75,21 @@ public ClusterService(IParameterService parameterService, ISymmetricDialect dial

public void init() {
sqlTemplate.update(getSql("initLockSql"), new Object[] { getServerId() });
initLockTable(ROUTE);
initLockTable(PULL);
initLockTable(PUSH);
initLockTable(HEARTBEAT);
initLockTable(PURGE_INCOMING);
initLockTable(PURGE_OUTGOING);
initLockTable(PURGE_STATISTICS);
initLockTable(SYNCTRIGGERS);
initLockTable(PURGE_DATA_GAPS);
initLockTable(STAGE_MANAGEMENT);
initLockTable(WATCHDOG);
initLockTable(STATISTICS);
initLockTable(FILE_SYNC_PULL);
initLockTable(FILE_SYNC_PUSH);
initLockTable(FILE_SYNC_TRACKER);
initLockTable(FILE_SYNC_SHARED, TYPE_SHARED);
initLockTable(INITIAL_LOAD_EXTRACT);

Map<String, Lock> allLocks = findLocks();
for (String action : new String[] { ROUTE, PULL, PUSH, HEARTBEAT, PURGE_INCOMING, PURGE_OUTGOING, PURGE_STATISTICS, SYNCTRIGGERS,
PURGE_DATA_GAPS, STAGE_MANAGEMENT, WATCHDOG, STATISTICS, FILE_SYNC_PULL, FILE_SYNC_PUSH, FILE_SYNC_TRACKER,
INITIAL_LOAD_EXTRACT }) {
if (allLocks.get(action) == null) {
initLockTable(action, TYPE_CLUSTER);
}
}

for (String action : new String[] { FILE_SYNC_SHARED }) {
if (allLocks.get(action) == null) {
initLockTable(action, TYPE_SHARED);
}
}
}

public void initLockTable(final String action) {
Expand Down
Expand Up @@ -48,16 +48,25 @@ public SequenceService(IParameterService parameterService, ISymmetricDialect sym
}

public void init() {
initSequence(Constants.SEQUENCE_OUTGOING_BATCH_LOAD_ID, 1);
Map<String, Sequence> sequences = getAll();
if (sequences.get(Constants.SEQUENCE_OUTGOING_BATCH_LOAD_ID) == null) {
initSequence(Constants.SEQUENCE_OUTGOING_BATCH_LOAD_ID, 1);
}

long maxBatchId = sqlTemplate.queryForLong(getSql("maxOutgoingBatchSql"));
initSequence(Constants.SEQUENCE_OUTGOING_BATCH, maxBatchId);
if (sequences.get(Constants.SEQUENCE_OUTGOING_BATCH) == null) {
long maxBatchId = sqlTemplate.queryForLong(getSql("maxOutgoingBatchSql"));
initSequence(Constants.SEQUENCE_OUTGOING_BATCH, maxBatchId);
}

long maxTriggerHistId = sqlTemplate.queryForLong(getSql("maxTriggerHistSql"));
initSequence(Constants.SEQUENCE_TRIGGER_HIST, maxTriggerHistId);
if (sequences.get(Constants.SEQUENCE_TRIGGER_HIST) == null) {
long maxTriggerHistId = sqlTemplate.queryForLong(getSql("maxTriggerHistSql"));
initSequence(Constants.SEQUENCE_TRIGGER_HIST, maxTriggerHistId);
}

long maxRequestId = sqlTemplate.queryForLong(getSql("maxExtractRequestSql"));
initSequence(Constants.SEQUENCE_EXTRACT_REQ, maxRequestId);
if (sequences.get(Constants.SEQUENCE_EXTRACT_REQ) == null) {
long maxRequestId = sqlTemplate.queryForLong(getSql("maxExtractRequestSql"));
initSequence(Constants.SEQUENCE_EXTRACT_REQ, maxRequestId);
}
}

private void initSequence(String name, long initialValue) {
Expand Down Expand Up @@ -198,6 +207,15 @@ protected Sequence get(ISqlTransaction transaction, String name) {
}
}

protected Map<String, Sequence> getAll() {
Map<String, Sequence> map = new HashMap<String, Sequence>();
List<Sequence> sequences = sqlTemplate.query(getSql("getAllSequenceSql"), new SequenceRowMapper());
for (Sequence sequence : sequences) {
map.put(sequence.getSequenceName(), sequence);
}
return map;
}

class SequenceRowMapper implements ISqlRowMapper<Sequence> {
public Sequence mapRow(Row rs) {
Sequence sequence = new Sequence();
Expand Down
Expand Up @@ -33,7 +33,11 @@ public SequenceServiceSqlMap(IDatabasePlatform platform, Map<String, String> rep
putSql("getSequenceSql",
"select sequence_name,current_value,increment_by,min_value,max_value, " +
"cycle,create_time,last_update_by,last_update_time from $(sequence) where sequence_name=?");


putSql("getAllSequenceSql",
"select sequence_name,current_value,increment_by,min_value,max_value," +
"cycle,create_time,last_update_by,last_update_time from $(sequence)");

putSql("getCurrentValueSql",
"select current_value from $(sequence) where sequence_name=?");

Expand Down
Expand Up @@ -1047,8 +1047,7 @@ public void syncTriggers(boolean force) {
}

public void syncTriggers(StringBuilder sqlBuffer, boolean force) {
if (platform.getDdlBuilder().getDatabaseInfo().isTriggersSupported() &&
(parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS) || isCalledFromSymmetricAdminTool())) {
if ((parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS) || isCalledFromSymmetricAdminTool())) {
synchronized (this) {
if (clusterService.lock(ClusterConstants.SYNCTRIGGERS)) {
try {
Expand Down Expand Up @@ -1109,12 +1108,7 @@ public void syncTriggers(StringBuilder sqlBuffer, boolean force) {
}
}
} else {
if (!platform.getDdlBuilder().getDatabaseInfo().isTriggersSupported()) {
log.info("Not synchronizing triggers. Platform does not support triggers.");
} else {
log.info("Not synchronizing triggers. {} is set to false",
ParameterConstants.AUTO_SYNC_TRIGGERS);
}
log.info("Not synchronizing triggers. {} is set to false", ParameterConstants.AUTO_SYNC_TRIGGERS);
}
}

Expand Down
Expand Up @@ -40,4 +40,12 @@ protected boolean allowsNullForIdentityColumn() {
return false;
}

@Override
protected String getSelectLastInsertIdSql(String sequenceName) {
if (sequenceName.equals("sym_data_data_id")) {
return "select max(data_id) from sym_data";
}
throw new UnsupportedOperationException();
}

}

0 comments on commit f5d2885

Please sign in to comment.