Skip to content

Commit

Permalink
Merge branch '3.9' of https://github.com/JumpMind/symmetric-ds.git in…
Browse files Browse the repository at this point in the history
…to 3.9
  • Loading branch information
klementinastojanovska committed Sep 22, 2017
2 parents 3d05d4a + 640b0f8 commit 5e15b69
Show file tree
Hide file tree
Showing 60 changed files with 1,392 additions and 475 deletions.
Expand Up @@ -104,7 +104,7 @@ protected IDatabasePlatform createDatabasePlatform(TypedProperties properties) {
protected IStagingManager createStagingManager() {
String directory = androidContext.getCacheDir().toString();
log.info("Staging manager directory: " + directory);
return new StagingManager(directory);
return new StagingManager(directory,false);
}

@Override
Expand Down
2 changes: 2 additions & 0 deletions symmetric-assemble/common.gradle
Expand Up @@ -141,6 +141,8 @@ subprojects { subproject ->
systemProperty "test.client", System.getProperty("test.client", "h2")
systemProperty "test.root", System.getProperty("test.root", "h2")
systemProperty "port.number", System.getProperty("port.number", "31415")
systemProperty "db2i.db.user", System.getProperty("db2i.db.user")
systemProperty "db2i.db.password", System.getProperty("db2i.db.password")
minHeapSize = "128m"
maxHeapSize = "512m"
forkEvery=1
Expand Down
Expand Up @@ -41,6 +41,7 @@
import org.jumpmind.db.platform.nuodb.NuoDbDatabasePlatform;
import org.jumpmind.db.platform.oracle.OracleDatabasePlatform;
import org.jumpmind.db.platform.postgresql.PostgreSqlDatabasePlatform;
import org.jumpmind.db.platform.raima.RaimaDatabasePlatform;
import org.jumpmind.db.platform.redshift.RedshiftDatabasePlatform;
import org.jumpmind.db.platform.sqlanywhere.SqlAnywhereDatabasePlatform;
import org.jumpmind.db.platform.sqlite.SqliteDatabasePlatform;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.jumpmind.symmetric.db.oracle.OracleSymmetricDialect;
import org.jumpmind.symmetric.db.postgresql.GreenplumSymmetricDialect;
import org.jumpmind.symmetric.db.postgresql.PostgreSqlSymmetricDialect;
import org.jumpmind.symmetric.db.raima.RaimaSymmetricDialect;
import org.jumpmind.symmetric.db.redshift.RedshiftSymmetricDialect;
import org.jumpmind.symmetric.db.sqlanywhere.SqlAnywhereSymmetricDialect;
import org.jumpmind.symmetric.db.sqlite.SqliteJdbcSymmetricDialect;
Expand Down Expand Up @@ -163,6 +165,8 @@ public ISymmetricDialect create() {
dialect = new TiberoSymmetricDialect(parameterService, platform);
} else if (platform instanceof NuoDbDatabasePlatform){
dialect = new NuoDbSymmetricDialect(parameterService, platform);
} else if (platform instanceof RaimaDatabasePlatform){
dialect = new RaimaSymmetricDialect(parameterService, platform);
}else{
dialect = new GenericSymmetricDialect(parameterService, platform);
}
Expand Down
@@ -0,0 +1,127 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.db.raima;

import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.platform.PermissionType;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.symmetric.db.AbstractSymmetricDialect;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.service.IParameterService;

public class RaimaSymmetricDialect extends AbstractSymmetricDialect implements ISymmetricDialect {

public RaimaSymmetricDialect(IParameterService parameterService, IDatabasePlatform platform) {
super(parameterService, platform);
this.triggerTemplate = new RaimaTriggerTemplate(this);
this.parameterService = parameterService;
}

@Override
public void createRequiredDatabaseObjects() {
}

@Override
public void dropRequiredDatabaseObjects() {
}

@Override
public boolean supportsTransactionId() {
return false;
}

@Override
protected boolean doesTriggerExistOnPlatform(String catalog, String schema, String tableName,
String triggerName) {
/*
schema = schema == null ? (platform.getDefaultSchema() == null ? null : platform
.getDefaultSchema()) : schema;
String checkSchemaSql = (schema != null && schema.length() > 0) ? " and schema='"
+ schema + "'"
: "";
return platform
.getSqlTemplate()
.queryForInt(
"select count(*) from system.triggers where triggername = ? and tablename = ?"
+ checkSchemaSql, new Object[] { triggerName, tableName }) > 0;
*/
return true;
}

@Override
public void removeTrigger(StringBuilder sqlBuffer, String catalogName, String schemaName,
String triggerName, String tableName) {
/*
final String sql = "drop trigger " + triggerName;
logSql(sql, sqlBuffer);
if (parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) {
try {
platform.getSqlTemplate().update(sql);
} catch (Exception e) {
log.warn("Trigger does not exist");
}
}
*/
}

public void disableSyncTriggers(ISqlTransaction transaction, String nodeId) {
transaction.prepareAndExecute("declare sync_triggers_disabled smallint; set @sync_triggers_disabled = 1;");
if (nodeId != null) {
transaction.prepareAndExecute("declare sync_node_disabled varchar(50); set @sync_node_disabled = '" + nodeId + "';");
}
}

public void enableSyncTriggers(ISqlTransaction transaction) {
transaction.prepareAndExecute("declare sync_triggers_disabled smallint; set @sync_triggers_disabled = null;");
transaction.prepareAndExecute("declare sync_node_disabled varchar(50); set @sync_node_disabled = null;");
}

public String getSyncTriggersExpression() {
return "@sync_triggers_disabled is null";
}

public void cleanDatabase() {
}

@Override
public boolean isClobSyncSupported() {
return false;
}

@Override
public boolean isBlobSyncSupported() {
return false;
}

@Override
public BinaryEncoding getBinaryEncoding() {
return BinaryEncoding.NONE;
}

@Override
public PermissionType[] getSymTablePermissions() {
PermissionType[] permissions = { PermissionType.CREATE_TABLE, PermissionType.DROP_TABLE, PermissionType.CREATE_TRIGGER, PermissionType.DROP_TRIGGER};
return permissions;
}

}

@@ -0,0 +1,128 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.db.raima;

import java.util.HashMap;

import org.jumpmind.symmetric.db.AbstractTriggerTemplate;
import org.jumpmind.symmetric.db.ISymmetricDialect;

public class RaimaTriggerTemplate extends AbstractTriggerTemplate {

public RaimaTriggerTemplate(ISymmetricDialect symmetricDialect) {
super(symmetricDialect);
emptyColumnTemplate = "''" ;
stringColumnTemplate = "if($(tableAlias).$(columnName) is null, '', '\"' || replace(replace($(tableAlias).$(columnName), '\\\\','\\\\\\\\'), '\"','\\\"') || '\"')";
numberColumnTemplate = "if($(tableAlias).$(columnName) is null, '', convert($(tableAlias).$(columnName), char))";
datetimeColumnTemplate = "if($(tableAlias).$(columnName) is null, '', '\"' || convert($(tableAlias).$(columnName), char) || '\"')";
clobColumnTemplate = stringColumnTemplate;
blobColumnTemplate = "''" ;
booleanColumnTemplate = "if($(tableAlias).$(columnName) is null, '', convert($(tableAlias).$(columnName), char))";
triggerConcatCharacter = "||" ;
newTriggerValue = "new_row" ;
oldTriggerValue = "old_row" ;
oldColumnPrefix = "" ;
newColumnPrefix = "" ;

sqlTemplates = new HashMap<String,String>();
sqlTemplates.put("insertTriggerTemplate" ,
"create trigger $(triggerName) after insert on $(schemaName)$(tableName) \n" +
"referencing new row as new_row \n" +
"for each row begin atomic \n" +
"declare sync_triggers_disabled smallint; \n" +
"declare sync_node_disabled varchar(50); \n" +
"$(custom_before_insert_text) \n" +
"if $(syncOnInsertCondition) and $(syncOnIncomingBatchCondition) then \n" +
" insert into $(defaultSchema)$(prefixName)_data \n" +
" (table_name, event_type, trigger_hist_id, row_data, channel_id, transaction_id, source_node_id, external_data, create_time) \n" +
" values('$(targetTableName)', 'I', $(triggerHistoryId), \n" +
" $(columns), \n" +
" $(channelExpression), $(txIdExpression), @sync_node_disabled, \n" +
" $(externalSelect), \n" +
" current_timestamp); \n" +
"end if; \n" +
"$(custom_on_insert_text) \n" +
"end");

sqlTemplates.put("insertReloadTriggerTemplate" ,
"create trigger $(triggerName) after insert on $(schemaName)$(tableName) \n" +
"referencing new row as new_row \n" +
"for each row begin atomic \n" +
"declare sync_triggers_disabled smallint; \n" +
"declare sync_node_disabled varchar(50); \n" +
"$(custom_before_insert_text) \n" +
"if $(syncOnInsertCondition) and $(syncOnIncomingBatchCondition) then \n" +
" insert into $(defaultSchema)$(prefixName)_data \n" +
" (table_name, event_type, trigger_hist_id, row_data, channel_id, transaction_id, source_node_id, external_data, create_time) \n" +
" values('$(targetTableName)', 'R', $(triggerHistoryId), \n" +
" $(newKeys), \n" +
" $(channelExpression), $(txIdExpression), @sync_node_disabled, \n" +
" $(externalSelect), \n" +
" current_timestamp); \n" +
"end if; \n" +
"$(custom_on_insert_text) \n" +
"end");

sqlTemplates.put("updateTriggerTemplate" ,
"create trigger $(triggerName) after update on $(schemaName)$(tableName) \n" +
"referencing new row as new_row old_row as old \n" +
"for each row begin atomic \n" +
"declare sync_triggers_disabled smallint; \n" +
"declare sync_node_disabled varchar(50); \n" +
"$(custom_before_update_text) \n" +
"if $(syncOnUpdateCondition) and $(syncOnIncomingBatchCondition) then \n" +
" insert into $(defaultSchema)$(prefixName)_data \n" +
" (table_name, event_type, trigger_hist_id, pk_data, row_data, old_data, channel_id, transaction_id, source_node_id, external_data, create_time) \n" +
" values('$(targetTableName)', 'U', $(triggerHistoryId), \n" +
" $(oldKeys), \n" +
" $(columns), \n" +
" $(oldColumns), \n" +
" $(channelExpression), $(txIdExpression), @sync_node_disabled, \n" +
" $(externalSelect), \n" +
" current_timestamp); \n" +
"end if; \n" +
"$(custom_on_update_text) \n" +
"end");

sqlTemplates.put("deleteTriggerTemplate" ,
"create trigger $(triggerName) after delete on $(schemaName)$(tableName) \n" +
"referencing old row as old_row " +
"for each row begin atomic \n" +
"declare sync_triggers_disabled smallint; \n" +
"declare sync_node_disabled varchar(50); \n" +
"$(custom_before_delete_text) \n" +
"if $(syncOnDeleteCondition) and $(syncOnIncomingBatchCondition) then \n" +
" insert into $(defaultSchema)$(prefixName)_data \n" +
" (table_name, event_type, trigger_hist_id, pk_data, old_data, channel_id, transaction_id, source_node_id, external_data, create_time) \n" +
" values('$(targetTableName)', 'D', $(triggerHistoryId), \n" +
" $(oldKeys), \n" +
" $(oldColumns), \n" +
" $(channelExpression), $(txIdExpression), @sync_node_disabled, \n" +
" $(externalSelect), \n" +
" current_timestamp); \n" +
"end if; \n" +
"$(custom_on_delete_text) \n" +
"end");

sqlTemplates.put("initialLoadSqlTemplate", "select $(columns) from $(schemaName)$(tableName) as t where $(whereClause) ");
}

}
Expand Up @@ -54,7 +54,7 @@ public static void setup() throws Exception {
.equals("net.sourceforge.jtds.jdbc.Driver")) {
platform = DbTestUtils.createDatabasePlatform(DbTestUtils.ROOT);
platform.createDatabase(platform.readDatabaseFromXml("/testBulkWriter.xml", true), true, false);
stagingManager = new StagingManager("target/tmp");
stagingManager = new StagingManager("target/tmp",false);
}
}

Expand Down
Expand Up @@ -45,7 +45,7 @@ public static void setup() throws Exception {
.equals("com.mysql.jdbc.Driver")) {
platform = DbTestUtils.createDatabasePlatform(DbTestUtils.ROOT);
platform.createDatabase(platform.readDatabaseFromXml("/testBulkWriter.xml", true), true, false);
stagingManager = new StagingManager("tmp");
stagingManager = new StagingManager("tmp",false);
}
}

Expand Down
Expand Up @@ -382,14 +382,14 @@ protected void init() {

String updateServiceClassName = properties.get(ParameterConstants.UPDATE_SERVICE_CLASS);
if (updateServiceClassName == null) {
this.updateService = new UpdateService(this);
this.updateService = new UpdateService(this);
} else {
try {
Constructor<?> cons = Class.forName(updateServiceClassName).getConstructor(ISymmetricEngine.class);
this.updateService = (IUpdateService) cons.newInstance(this);
} catch (Exception e) {
throw new RuntimeException(e);
}
try {
Constructor<?> cons = Class.forName(updateServiceClassName).getConstructor(ISymmetricEngine.class);
this.updateService = (IUpdateService) cons.newInstance(this);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

if (parameterService.isRegistrationServer()) {
Expand Down
Expand Up @@ -18,7 +18,7 @@ public class BatchStagingManager extends StagingManager {
ISymmetricEngine engine;

public BatchStagingManager(ISymmetricEngine engine, String directory) {
super(directory);
super(directory,engine.getParameterService().is(ParameterConstants.CLUSTER_LOCKING_ENABLED));
this.engine = engine;
}

Expand Down

0 comments on commit 5e15b69

Please sign in to comment.