Skip to content

Commit

Permalink
Raima support
Browse files Browse the repository at this point in the history
  • Loading branch information
jumpmind-josh committed Jan 15, 2018
1 parent 327f974 commit e9cdbb6
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 55 deletions.
Expand Up @@ -24,6 +24,7 @@
import org.jumpmind.db.platform.PermissionType;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.AbstractSymmetricDialect;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.service.IParameterService;
Expand All @@ -38,7 +39,7 @@ public RaimaSymmetricDialect(IParameterService parameterService, IDatabasePlatfo

@Override
public void createRequiredDatabaseObjects() {
}
}

@Override
public void dropRequiredDatabaseObjects() {
Expand All @@ -52,26 +53,22 @@ public boolean supportsTransactionId() {
@Override
protected boolean doesTriggerExistOnPlatform(String catalog, String schema, String tableName,
String triggerName) {
/*
schema = schema == null ? (platform.getDefaultSchema() == null ? null : platform
schema = schema == null ? (platform.getDefaultSchema() == null ? null : platform
.getDefaultSchema()) : schema;
String checkSchemaSql = (schema != null && schema.length() > 0) ? " and schema='"
String checkSchemaSql = (schema != null && schema.length() > 0) ? " and schemaname='"
+ schema + "'"
: "";
return platform
.getSqlTemplate()
.queryForInt(
"select count(*) from system.triggers where triggername = ? and tablename = ?"
"select count(*) from sys_trigger where name = ? and tabname = ?"
+ checkSchemaSql, new Object[] { triggerName, tableName }) > 0;
*/
return true;
}

@Override
public void removeTrigger(StringBuilder sqlBuffer, String catalogName, String schemaName,
String triggerName, String tableName) {
/*
final String sql = "drop trigger " + triggerName;
final String sql = "drop trigger " + triggerName;
logSql(sql, sqlBuffer);
if (parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) {
try {
Expand All @@ -80,21 +77,25 @@ public void removeTrigger(StringBuilder sqlBuffer, String catalogName, String sc
log.warn("Trigger does not exist");
}
}
*/

}

@Override
public void disableSyncTriggers(ISqlTransaction transaction, String nodeId) {
transaction.prepareAndExecute("declare sync_triggers_disabled smallint;");
transaction.prepareAndExecute("declare @sync_triggers_disabled smallint;");
transaction.prepareAndExecute("set @sync_triggers_disabled = 1;");
if (nodeId != null) {
transaction.prepareAndExecute("declare sync_node_disabled varchar(50);");
transaction.prepareAndExecute("declare @sync_node_disabled varchar(50);");
transaction.prepareAndExecute("set @sync_node_disabled = '" + nodeId + "';");
}
}

@Override
public void enableSyncTriggers(ISqlTransaction transaction) {
transaction.prepareAndExecute("declare sync_triggers_disabled smallint; set @sync_triggers_disabled = null;");
transaction.prepareAndExecute("declare sync_node_disabled varchar(50); set @sync_node_disabled = null;");
transaction.prepareAndExecute("declare @sync_triggers_disabled smallint;");
transaction.prepareAndExecute("set @sync_triggers_disabled = null;");
transaction.prepareAndExecute("declare @sync_node_disabled varchar(50);");
transaction.prepareAndExecute("set @sync_node_disabled = null;");
}

public String getSyncTriggersExpression() {
Expand Down
Expand Up @@ -22,8 +22,13 @@

import java.util.HashMap;

import org.jumpmind.db.model.Table;
import org.jumpmind.symmetric.db.AbstractTriggerTemplate;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.Trigger;
import org.jumpmind.symmetric.model.TriggerHistory;

public class RaimaTriggerTemplate extends AbstractTriggerTemplate {

Expand All @@ -43,50 +48,57 @@ public RaimaTriggerTemplate(ISymmetricDialect symmetricDialect) {
newColumnPrefix = "" ;

sqlTemplates = new HashMap<String,String>();
sqlTemplates.put("insertTriggerTemplate" ,
sqlTemplates.put("insertTriggerTemplate", "declare sync_node_disabled varchar(50);");

sqlTemplates.put("insertPostTriggerTemplate" ,
"create trigger $(triggerName) after insert on $(schemaName)$(tableName) \n" +
"referencing new row as new_row \n" +
"for each row begin atomic \n" +
"declare sync_triggers_disabled smallint; \n" +
"declare sync_node_disabled varchar(50); \n" +
"declare local_sync_node_disabled varchar(50);\n" +
"declare continue handler for sqlexception set local_sync_node_disabled = null;\n" +
"$(custom_before_insert_text) \n" +
"if $(syncOnInsertCondition) and $(syncOnIncomingBatchCondition) then \n" +
" set local_sync_node_disabled = @sync_node_disabled; \n" +
" insert into $(defaultSchema)$(prefixName)_data \n" +
" (table_name, event_type, trigger_hist_id, row_data, channel_id, transaction_id, source_node_id, external_data, create_time) \n" +
" values('$(targetTableName)', 'I', $(triggerHistoryId), \n" +
" $(columns), \n" +
" $(channelExpression), $(txIdExpression), @sync_node_disabled, \n" +
" $(channelExpression), $(txIdExpression), local_sync_node_disabled, \n" +
" $(externalSelect), \n" +
" current_timestamp); \n" +
"end if; \n" +
"$(custom_on_insert_text) \n" +
"end");

sqlTemplates.put("insertReloadTriggerTemplate" ,
sqlTemplates.put("insertReloadTriggerTemplate", "declare sync_node_disabled varchar(50);");

sqlTemplates.put("insertPostReloadTriggerTemplate" ,
"create trigger $(triggerName) after insert on $(schemaName)$(tableName) \n" +
"referencing new row as new_row \n" +
"for each row begin atomic \n" +
"declare sync_triggers_disabled smallint; \n" +
"declare sync_node_disabled varchar(50); \n" +
"declare local_sync_node_disabled varchar(50);\n" +
"declare continue handler for sqlexception set local_sync_node_disabled = null;\n" +
"$(custom_before_insert_text) \n" +
"if $(syncOnInsertCondition) and $(syncOnIncomingBatchCondition) then \n" +
" insert into $(defaultSchema)$(prefixName)_data \n" +
" (table_name, event_type, trigger_hist_id, row_data, channel_id, transaction_id, source_node_id, external_data, create_time) \n" +
" values('$(targetTableName)', 'R', $(triggerHistoryId), \n" +
" $(newKeys), \n" +
" $(channelExpression), $(txIdExpression), @sync_node_disabled, \n" +
" $(channelExpression), $(txIdExpression), local_sync_node_disabled, \n" +
" $(externalSelect), \n" +
" current_timestamp); \n" +
"end if; \n" +
"$(custom_on_insert_text) \n" +
"end");

sqlTemplates.put("updateTriggerTemplate" ,
sqlTemplates.put("updateTriggerTemplate", "declare sync_node_disabled varchar(50);");

sqlTemplates.put("updatePostTriggerTemplate" ,
"create trigger $(triggerName) after update on $(schemaName)$(tableName) \n" +
"referencing new row as new_row old_row as old \n" +
"referencing new row as new_row old row as old_row \n" +
"for each row begin atomic \n" +
"declare sync_triggers_disabled smallint; \n" +
"declare sync_node_disabled varchar(50); \n" +
"declare local_sync_node_disabled varchar(50);\n" +
"declare continue handler for sqlexception set local_sync_node_disabled = null;\n" +
"$(custom_before_update_text) \n" +
"if $(syncOnUpdateCondition) and $(syncOnIncomingBatchCondition) then \n" +
" insert into $(defaultSchema)$(prefixName)_data \n" +
Expand All @@ -95,27 +107,29 @@ public RaimaTriggerTemplate(ISymmetricDialect symmetricDialect) {
" $(oldKeys), \n" +
" $(columns), \n" +
" $(oldColumns), \n" +
" $(channelExpression), $(txIdExpression), @sync_node_disabled, \n" +
" $(channelExpression), $(txIdExpression), local_sync_node_disabled, \n" +
" $(externalSelect), \n" +
" current_timestamp); \n" +
"end if; \n" +
"$(custom_on_update_text) \n" +
"end");

sqlTemplates.put("deleteTriggerTemplate" ,
sqlTemplates.put("deleteTriggerTemplate", "declare sync_node_disabled varchar(50);");

sqlTemplates.put("deletePostTriggerTemplate" ,
"create trigger $(triggerName) after delete on $(schemaName)$(tableName) \n" +
"referencing old row as old_row " +
"for each row begin atomic \n" +
"declare sync_triggers_disabled smallint; \n" +
"declare sync_node_disabled varchar(50); \n" +
"declare local_sync_node_disabled varchar(50);\n" +
"declare continue handler for sqlexception set local_sync_node_disabled = null;\n" +
"$(custom_before_delete_text) \n" +
"if $(syncOnDeleteCondition) and $(syncOnIncomingBatchCondition) then \n" +
" insert into $(defaultSchema)$(prefixName)_data \n" +
" (table_name, event_type, trigger_hist_id, pk_data, old_data, channel_id, transaction_id, source_node_id, external_data, create_time) \n" +
" values('$(targetTableName)', 'D', $(triggerHistoryId), \n" +
" $(oldKeys), \n" +
" $(oldColumns), \n" +
" $(channelExpression), $(txIdExpression), @sync_node_disabled, \n" +
" $(channelExpression), $(txIdExpression), local_sync_node_disabled, \n" +
" $(externalSelect), \n" +
" current_timestamp); \n" +
"end if; \n" +
Expand All @@ -124,5 +138,4 @@ public RaimaTriggerTemplate(ISymmetricDialect symmetricDialect) {

sqlTemplates.put("initialLoadSqlTemplate", "select $(columns) from $(schemaName)$(tableName) as t where $(whereClause) ");
}

}
Expand Up @@ -115,7 +115,7 @@ public boolean areColumnSizesTheSame(Column sourceColumn, Column targetColumn){
if(sourceColumn.getMappedType().equals("DECIMAL") && targetColumn.getMappedType().equals("DECIMAL")){
int targetSize = targetColumn.getSizeAsInt();
int sourceSize = sourceColumn.getSizeAsInt();
if (targetSize > 8 && sourceSize == 8 &&
if (targetSize > 8 && sourceSize > 8 &&
targetColumn.getScale() == sourceColumn.getScale()) {
return true;
}else{
Expand Down
Expand Up @@ -19,14 +19,30 @@
package org.jumpmind.db.platform.raima;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.ForeignKey;
import org.jumpmind.db.model.IIndex;
import org.jumpmind.db.model.IndexColumn;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.model.Trigger;
import org.jumpmind.db.model.Trigger.TriggerType;
import org.jumpmind.db.platform.AbstractJdbcDdlReader;
import org.jumpmind.db.platform.DatabaseMetaDataWrapper;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.ChangeCatalogConnectionHandler;
import org.jumpmind.db.sql.IConnectionHandler;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.JdbcSqlTemplate;
import org.jumpmind.db.sql.Row;

public class RaimaDdlReader extends AbstractJdbcDdlReader {

Expand All @@ -47,54 +63,75 @@ protected Table readTable(Connection connection, DatabaseMetaDataWrapper metaDat
determineAutoIncrementFromResultSetMetaData(connection, table,
table.getColumns());
table.setCatalog(null);

if (table.getIndexCount() > 0) {
Collection<IIndex> nonPkIndices = new ArrayList<IIndex>();

for (IIndex index : table.getIndices()) {
if (index.getColumnCount() == table.getPrimaryKeyColumnCount()) {
int matches = 0;
for (IndexColumn indexColumn : index.getColumns()) {
for (String pkColName : table.getPrimaryKeyColumnNames()) {
if (pkColName.equals(indexColumn.getName())) {
matches++;
}
}
}
if (matches != index.getColumnCount()) {
nonPkIndices.add(index);
}
}
else {
nonPkIndices.add(index);
}
}

table.removeAllIndices();
table.addIndices(nonPkIndices);
}
}
return table;
}

/*

@Override
protected Column readColumn(DatabaseMetaDataWrapper metaData, Map<String, Object> values)
throws SQLException {
Column column = super.readColumn(metaData, values);
return column;
}
*/

/*

@Override
protected boolean isInternalPrimaryKeyIndex(Connection connection,
DatabaseMetaDataWrapper metaData, Table table, IIndex index) {
return ((table.getName()).toUpperCase() + "..PRIMARY_KEY").equals(index.getName());
}
*/

/*

@Override
protected boolean isInternalForeignKeyIndex(Connection connection,
DatabaseMetaDataWrapper metaData, Table table, ForeignKey fk, IIndex index) {
return getPlatform().getDdlBuilder().getForeignKeyName(table, fk).equals(index.getName());
}
*/

/*
@Override
protected Collection<ForeignKey> readForeignKeys(Connection connection,
DatabaseMetaDataWrapper metaData, String tableName) throws SQLException {

Map<String, ForeignKey> fks = new LinkedHashMap<String, ForeignKey>();
ResultSet fkData = null;
/*
try {
PreparedStatement ps = connection.prepareStatement(
"SELECT f2.FIELD AS REFERENCED_COLUMN_NAME, t2.TABLENAME AS REFERENCED_TABLE_NAME, f.FIELD AS COLUMN_NAME, \"POSITION\", FOREIGNKEYNAME AS CONSTRAINT_NAME " +
"FROM \"SYSTEM\".FOREIGNKEYS AS fk "+
"INNER JOIN SYSTEM.TABLES AS t ON t.TABLEID = fk.FOREIGNTABLEID "+
"INNER JOIN SYSTEM.TABLES AS t2 ON t2.TABLEID = fk.PRIMARYTABLEID "+
"INNER JOIN SYSTEM.FIELDS AS f ON f.FIELDID = fk.FOREIGNFIELDID AND f.TABLENAME = t.TABLENAME AND f.\"SCHEMA\"= t.\"SCHEMA\" " +
"INNER JOIN SYSTEM.FIELDS AS f2 ON f2.FIELDID = fk.PRIMARYFIELDID AND f2.TABLENAME = t2.TABLENAME AND f2.\"SCHEMA\" = t2.\"SCHEMA\" " +
"WHERE t.TABLENAME = ?");
PreparedStatement ps = connection.prepareStatement(
"SELECT f2.name AS REFERENCED_COLUMN_NAME, t2.name AS REFERENCED_TABLE_NAME, f.name AS COLUMN_NAME, \"POSITION\", fk_name AS CONSTRAINT_NAME " +
"FROM sys_fkey AS fk " +
"INNER JOIN sys_table AS t ON t.name = fk.fktabname " +
"INNER JOIN sys_table AS t2 ON t2.name = fk.pktabname " +
"INNER JOIN sys_column AS f ON f.name = fk.fkcolname AND f.tabname = t.name AND f.dbname= t.dbname " +
"INNER JOIN sys_column AS f2 ON f2.name = fk.pkcolname AND f2.tabname = t2.name AND f2.dbname= t2.dbname " +
"WHERE t.name = ? ");
ps.setString(1, tableName);
fkData = ps.executeQuery();
while (fkData.next()) {
Expand All @@ -118,11 +155,10 @@ protected Collection<ForeignKey> readForeignKeys(Connection connection,
} finally {
close(fkData);
}
*/
return fks.values();
}
*/

/*
public List<Trigger> getTriggers(final String catalog, final String schema,
final String tableName) {

Expand Down Expand Up @@ -160,7 +196,6 @@ public Trigger mapRow(Row row) {

return triggers;
}
*/

@Override
protected Integer mapUnknownJdbcTypeForColumn(Map<String, Object> values) {
Expand All @@ -171,4 +206,5 @@ protected Integer mapUnknownJdbcTypeForColumn(Map<String, Object> values) {
return super.mapUnknownJdbcTypeForColumn(values);
}
}

}
Expand Up @@ -20,6 +20,9 @@
*/
package org.jumpmind.db.platform.raima;

import java.sql.Connection;
import java.sql.Types;

import javax.sql.DataSource;

import org.jumpmind.db.platform.DatabaseInfo;
Expand All @@ -33,6 +36,16 @@ public RaimaJdbcSqlTemplate(DataSource dataSource, SqlTemplateSettings settings,
SymmetricLobHandler lobHandler, DatabaseInfo databaseInfo) {
super(dataSource, settings, lobHandler, databaseInfo);
primaryKeyViolationSqlStates = new String[] { "40002" };
setIsolationLevel(Connection.TRANSACTION_READ_COMMITTED);
}

@Override
protected int verifyArgType(Object arg, int argType) {
if (argType == Types.NUMERIC){
return Types.DECIMAL;
} else {
return super.verifyArgType(arg, argType);
}
}

}

0 comments on commit e9cdbb6

Please sign in to comment.