Skip to content

Commit

Permalink
Merge branch '3.8' of https://github.com/JumpMind/symmetric-ds.git in…
Browse files Browse the repository at this point in the history
…to 3.8
  • Loading branch information
jumpmind-josh committed Oct 4, 2016
2 parents 3cd43c2 + e1c04c5 commit c2e55e4
Show file tree
Hide file tree
Showing 22 changed files with 425 additions and 55 deletions.
Expand Up @@ -27,6 +27,8 @@
import org.jumpmind.db.model.Table;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.Row;
import org.jumpmind.db.sql.RowMapper;

import android.database.sqlite.SQLiteDatabase;

Expand Down Expand Up @@ -56,6 +58,16 @@ public boolean isInBatchMode() {
return false;
}

@Override
public Row queryForRow(String sql, Object... args) {
List<Row> rows = query(sql, new RowMapper(), args, null);
if (rows.size() > 0) {
return rows.get(0);
} else {
return null;
}
}

public <T> List<T> query(String sql, ISqlRowMapper<T> mapper, Map<String, Object> namedParams) {
return sqlTemplate.query(sql, mapper, namedParams);
}
Expand Down
Expand Up @@ -277,6 +277,7 @@ protected IDatabasePlatform createDatabasePlatform(TypedProperties properties) {

public static IDatabasePlatform createDatabasePlatform(ApplicationContext springContext, TypedProperties properties,
DataSource dataSource, boolean waitOnAvailableDatabase) {
log.info("Initializing connection to database");
if (dataSource == null) {
String jndiName = properties.getProperty(ParameterConstants.DB_JNDI_NAME);
if (StringUtils.isNotBlank(jndiName)) {
Expand Down
Expand Up @@ -53,6 +53,7 @@
import org.jumpmind.symmetric.db.firebird.Firebird20SymmetricDialect;
import org.jumpmind.symmetric.db.firebird.Firebird21SymmetricDialect;
import org.jumpmind.symmetric.db.firebird.FirebirdSymmetricDialect;
import org.jumpmind.symmetric.db.generic.GenericSymmetricDialect;
import org.jumpmind.symmetric.db.h2.H2SymmetricDialect;
import org.jumpmind.symmetric.db.hsqldb.HsqlDbSymmetricDialect;
import org.jumpmind.symmetric.db.hsqldb2.HsqlDb2SymmetricDialect;
Expand Down Expand Up @@ -155,7 +156,7 @@ public ISymmetricDialect create() {
} else if (platform instanceof VoltDbDatabasePlatform) {
dialect = new VoltDbSymmetricDialect(parameterService, platform);
} else {
throw new DbNotSupportedException();
dialect = new GenericSymmetricDialect(parameterService, platform);
}
return dialect;
}
Expand Down
@@ -0,0 +1,53 @@
package org.jumpmind.symmetric.db.generic;

import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.symmetric.db.AbstractSymmetricDialect;
import org.jumpmind.symmetric.service.IParameterService;

public class GenericSymmetricDialect extends AbstractSymmetricDialect {

public GenericSymmetricDialect(IParameterService parameterService, IDatabasePlatform platform) {
super(parameterService, platform);
this.triggerTemplate = new GenericTriggerTemplate(this);
this.supportsSubselectsInDelete = false;
this.supportsSubselectsInUpdate = false;
}

@Override
public void cleanDatabase() {
}

@Override
public void disableSyncTriggers(ISqlTransaction transaction, String nodeId) {
}

@Override
public void enableSyncTriggers(ISqlTransaction transaction) {
}

@Override
public String getSyncTriggersExpression() {
return null;
}

@Override
public void dropRequiredDatabaseObjects() {
}

@Override
public void createRequiredDatabaseObjects() {
}

@Override
public BinaryEncoding getBinaryEncoding() {
return null;
}

@Override
protected boolean doesTriggerExistOnPlatform(String catalogName, String schema, String tableName, String triggerName) {
return false;
}

}
@@ -0,0 +1,40 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.db.generic;

import java.util.HashMap;

import org.jumpmind.symmetric.db.AbstractTriggerTemplate;
import org.jumpmind.symmetric.db.ISymmetricDialect;

public class GenericTriggerTemplate extends AbstractTriggerTemplate {

public GenericTriggerTemplate(ISymmetricDialect symmetricDialect) {
super(symmetricDialect);

sqlTemplates = new HashMap<String,String>();
sqlTemplates.put("insertTriggerTemplate" , "");
sqlTemplates.put("updateTriggerTemplate" , "");
sqlTemplates.put("deleteTriggerTemplate" , "");
sqlTemplates.put("initialLoadSqlTemplate" , "select $(columns) from $(schemaName)$(tableName) t where $(whereClause) " );
}

}
Expand Up @@ -24,9 +24,6 @@
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.symmetric.db.AbstractSymmetricDialect;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.db.JdbcSymmetricDialectFactory;
import org.jumpmind.symmetric.db.postgresql.GreenplumTriggerTemplate;
import org.jumpmind.symmetric.service.IParameterService;

public class VoltDbSymmetricDialect extends AbstractSymmetricDialect {
Expand Down
Expand Up @@ -39,34 +39,13 @@ public VoltDbTriggerTemplate(ISymmetricDialect symmetricDialect) {
triggerConcatCharacter = "||" ;
newTriggerValue = "" ;
oldTriggerValue = "" ;
// timeColumnTemplate = null;
// dateColumnTemplate = null;
// clobColumnTemplate = "case when $(tableAlias)..\"$(columnName)\" is null then '' else '\"' || replace(replace($(tableAlias)..\"$(columnName)\",$$\\$$,$$\\\\$$),'\"',$$\\\"$$) || '\"' end" ;
// blobColumnTemplate = "case when $(tableAlias)..\"$(columnName)\" is null then '' else '\"' || pg_catalog.encode($(tableAlias)..\"$(columnName)\", 'base64') || '\"' end" ;
// wrappedBlobColumnTemplate = "case when $(tableAlias)..\"$(columnName)\" is null then '' else '\"' || $(defaultSchema)$(prefixName)_largeobject($(tableAlias)..\"$(columnName)\") || '\"' end" ;
// booleanColumnTemplate = "case when $(tableAlias)..\"$(columnName)\" is null then '' when $(tableAlias)..\"$(columnName)\" then '\"1\"' else '\"0\"' end" ;
// triggerConcatCharacter = "||" ;
// newTriggerValue = "new" ;
// oldTriggerValue = "old" ;
// oldColumnPrefix = "" ;
// newColumnPrefix = "" ;
// otherColumnTemplate = null;

sqlTemplates = new HashMap<String,String>();

sqlTemplates.put("insertTriggerTemplate" , "");
sqlTemplates.put("updateTriggerTemplate" , "");
sqlTemplates.put("deleteTriggerTemplate" , "");
sqlTemplates.put("initialLoadSqlTemplate" ,
"select $(columns) from $(schemaName)$(tableName) t where $(whereClause) " );
sqlTemplates.put("initialLoadSqlTemplate" , "select $(columns) from $(schemaName)$(tableName) t where $(whereClause) " );
}

//
// sqlTemplates.put("deletePostTriggerTemplate" ,
//"create trigger $(triggerName) after delete on $(schemaName)$(tableName) " +
//" for each row execute procedure $(schemaName)f$(triggerName)(); " );
//
// sqlTemplates.put("initialLoadSqlTemplate" ,
//"select $(columns) from $(schemaName)$(tableName) t where $(whereClause) " );
// }
}
Expand Up @@ -959,7 +959,7 @@ protected boolean isPreviouslyExtracted(OutgoingBatch currentBatch) {
protected boolean isRetry(OutgoingBatch currentBatch, Node remoteNode) {
IStagedResource previouslyExtracted = getStagedResource(currentBatch);
return previouslyExtracted != null && previouslyExtracted.exists() && previouslyExtracted.getState() != State.CREATE
&& currentBatch.getStatus() != OutgoingBatch.Status.RS && remoteNode.isVersionGreaterThanOrEqualTo(3, 8, 0);
&& currentBatch.getStatus() != OutgoingBatch.Status.RS && currentBatch.getSentCount() > 0 && remoteNode.isVersionGreaterThanOrEqualTo(3, 8, 0);
}

protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNode,
Expand Down
Expand Up @@ -33,6 +33,7 @@
import java.util.Set;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Table;
Expand Down Expand Up @@ -1624,27 +1625,52 @@ protected Data createData(ISqlTransaction transaction, Trigger trigger, String w
String rowData = null;
String pkData = null;
if (whereClause != null) {
rowData = (String) transaction.queryForObject(symmetricDialect
.createCsvDataSql(trigger, triggerHistory, engine
.getConfigurationService().getChannel(trigger.getChannelId()),
whereClause), String.class);
if (rowData != null) {
rowData = rowData.trim();
}
pkData = (String) transaction.queryForObject(symmetricDialect
.createCsvPrimaryKeySql(trigger, triggerHistory, engine
.getConfigurationService().getChannel(trigger.getChannelId()),
whereClause), String.class);
if (pkData != null) {
pkData = pkData.trim();
}
rowData = getCsvDataFor(transaction, trigger, triggerHistory, whereClause, false);
pkData = getCsvDataFor(transaction, trigger, triggerHistory, whereClause, true);
}
data = new Data(trigger.getSourceTableName(), DataEventType.UPDATE, rowData,
pkData, triggerHistory, trigger.getChannelId(), null, null);
}
}
return data;
}

protected String getCsvDataFor(ISqlTransaction transaction, Trigger trigger, TriggerHistory triggerHistory, String whereClause, boolean pkOnly) {
String data = null;
String sql = null;
try {
if (pkOnly) {
sql = symmetricDialect.createCsvPrimaryKeySql(trigger, triggerHistory,
engine.getConfigurationService().getChannel(trigger.getChannelId()), whereClause);
} else {
sql = symmetricDialect.createCsvDataSql(trigger, triggerHistory,
engine.getConfigurationService().getChannel(trigger.getChannelId()), whereClause);
}
} catch (NotImplementedException e) {
}

if (isNotBlank(sql)) {
data = transaction.queryForObject(sql, String.class);
} else {
DatabaseInfo databaseInfo = platform.getDatabaseInfo();
String quote = databaseInfo.getDelimiterToken() == null || !parameterService.is(ParameterConstants.DB_DELIMITED_IDENTIFIER_MODE)
? "" : databaseInfo.getDelimiterToken();
sql = "select " + triggerHistory.getColumnNames() + " from "
+ Table.getFullyQualifiedTableName(triggerHistory.getSourceCatalogName(), triggerHistory.getSourceSchemaName(),
triggerHistory.getSourceTableName(), quote, databaseInfo.getCatalogSeparator(),
databaseInfo.getSchemaSeparator()) + " t where " + whereClause;
Row row = transaction.queryForRow(sql);
if (row != null) {
data = row.csvValue();
}
}

if (data != null) {
data = data.trim();
}

return data;
}

public long countDataGapsByStatus(DataGap.Status status) {
return sqlTemplate.queryForLong(getSql("countDataGapsByStatusSql"), new Object[] { status.name() });
Expand Down
Expand Up @@ -25,6 +25,7 @@ final public class DatabaseNamesConstants {
private DatabaseNamesConstants() {
}

public final static String GENERIC = "generic";
public final static String H2 = "h2";
public final static String HSQLDB = "hsqldb";
public final static String HSQLDB2 = "hsqldb2";
Expand Down
Expand Up @@ -33,6 +33,8 @@ public interface ISqlTransaction {

public <T> T queryForObject(String sql, Class<T> clazz, Object... args);

public Row queryForRow(String sql, Object... args);

public int queryForInt(String sql, Object... args);

public long queryForLong(String sql, Object... args);
Expand Down
13 changes: 13 additions & 0 deletions symmetric-db/src/main/java/org/jumpmind/db/sql/RowMapper.java
@@ -0,0 +1,13 @@
package org.jumpmind.db.sql;

public class RowMapper implements ISqlRowMapper<Row> {

public RowMapper() {
}

@Override
public Row mapRow(Row row) {
return row;
}

}
Expand Up @@ -65,7 +65,7 @@ public enum Format {
};

public enum Compatible {
DB2, DERBY, FIREBIRD, FIREBIRD_DIALECT1, GREENPLUM, H2, HSQLDB, HSQLDB2, INFORMIX, INTERBASE, MSSQL, MSSQL2000, MSSQL2005, MSSQL2008, MYSQL, ORACLE, POSTGRES, SYBASE, SQLITE, MARIADB, ASE, SQLANYWHERE, REDSHIFT, VOLTDB
DB2, DERBY, FIREBIRD, FIREBIRD_DIALECT1, GREENPLUM, H2, HSQLDB, HSQLDB2, INFORMIX, INTERBASE, MSSQL, MSSQL2000, MSSQL2005, MSSQL2008, MYSQL, ORACLE, POSTGRES, SYBASE, SQLITE, MARIADB, ASE, SQLANYWHERE, REDSHIFT, VOLTDB, GENERIC
};

private Format format = Format.SQL;
Expand Down
Expand Up @@ -228,8 +228,8 @@ protected LoadStatus delete(CsvData data, boolean useConflictDetection) {
lookupColumns.add(versionColumn);
} else {
log.error(
"Could not find the timestamp/version column with the name {}. Defaulting to using primary keys for the lookup.",
conflict.getDetectExpression());
"Could not find the timestamp/version column with the name {} on table {}. Defaulting to using primary keys for the lookup.",
conflict.getDetectExpression(), targetTable.getName());
}
Column[] pks = targetTable.getPrimaryKeyColumns();
for (Column column : pks) {
Expand Down Expand Up @@ -376,8 +376,8 @@ protected LoadStatus update(CsvData data, boolean applyChangesOnly, boolean useC
lookupColumns.add(versionColumn);
} else {
log.error(
"Could not find the timestamp/version column with the name {}. Defaulting to using primary keys for the lookup.",
conflict.getDetectExpression());
"Could not find the timestamp/version column with the name {} on table {}. Defaulting to using primary keys for the lookup.",
conflict.getDetectExpression(), targetTable.getName());
}
pks = targetTable.getPrimaryKeyColumns();
for (Column column : pks) {
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.jumpmind.db.platform.derby.DerbyDatabasePlatform;
import org.jumpmind.db.platform.firebird.FirebirdDatabasePlatform;
import org.jumpmind.db.platform.firebird.FirebirdDialect1DatabasePlatform;
import org.jumpmind.db.platform.generic.GenericJdbcDatabasePlatform;
import org.jumpmind.db.platform.greenplum.GreenplumPlatform;
import org.jumpmind.db.platform.h2.H2DatabasePlatform;
import org.jumpmind.db.platform.hsqldb.HsqlDbDatabasePlatform;
Expand Down Expand Up @@ -168,7 +169,7 @@ public static synchronized IDatabasePlatform createNewPlatformInstance(DataSourc


protected static synchronized Class<? extends IDatabasePlatform> findPlatformClass(
String[] nameVersion) throws DdlException {
String[] nameVersion) {
Class<? extends IDatabasePlatform> platformClass = platforms.get(String.format("%s%s",
nameVersion[0], nameVersion[1]).toLowerCase());

Expand All @@ -181,11 +182,10 @@ protected static synchronized Class<? extends IDatabasePlatform> findPlatformCla
}

if (platformClass == null) {
throw new DdlException("Could not find platform for database " + nameVersion[0]);
} else {
return platformClass;
}

platformClass = GenericJdbcDatabasePlatform.class;
}

return platformClass;
}

protected static String[] determineDatabaseNameVersionSubprotocol(DataSource dataSource)
Expand Down

0 comments on commit c2e55e4

Please sign in to comment.