Skip to content

Commit

Permalink
Merge pull request #3 from woehrl01/sqlite-without-contextable
Browse files Browse the repository at this point in the history
Provide functionality to not require context table on sqlite for threadsafe data capture
  • Loading branch information
chenson42 committed Sep 21, 2015
2 parents 9710bd4 + 8b2e477 commit 23de65e
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 14 deletions.
Expand Up @@ -290,8 +290,11 @@ private ParameterConstants() {
public final static String MSSQL_ROW_LEVEL_LOCKS_ONLY = "mssql.allow.only.row.level.locks.on.runtime.tables";

public final static String MSSQL_USE_NTYPES_FOR_SYNC = "mssql.use.ntypes.for.sync";


public final static String MSSQL_TRIGGER_EXECUTE_AS = "mssql.trigger.execute.as";

public final static String SQLITE_TRIGGER_FUNCTION_TO_USE = "sqlite.trigger.function.to.use";

public final static String EXTENSIONS_XML = "extensions.xml";

Expand Down
Expand Up @@ -20,15 +20,23 @@
*/
package org.jumpmind.symmetric.db.sqlite;

import java.sql.Connection;
import java.sql.SQLException;

import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.IConnectionCallback;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.JdbcSqlTransaction;
import org.jumpmind.db.sql.SqlException;
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.AbstractSymmetricDialect;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.util.AppUtils;

import com.mysql.jdbc.StringUtils;

public class SqliteSymmetricDialect extends AbstractSymmetricDialect {

public static final String CONTEXT_TABLE_NAME = "context";
Expand All @@ -41,13 +49,21 @@ public class SqliteSymmetricDialect extends AbstractSymmetricDialect {
static final String SYNC_TRIGGERS_DISABLED_USER_VARIABLE = "sync_triggers_disabled";
static final String SYNC_TRIGGERS_DISABLED_NODE_VARIABLE = "sync_node_disabled";

String sqliteFunctionToOverride;

public SqliteSymmetricDialect(IParameterService parameterService, IDatabasePlatform platform) {
super(parameterService, platform);
this.triggerTemplate = new SqliteTriggerTemplate(this);

sqliteFunctionToOverride = parameterService.getString(ParameterConstants.SQLITE_TRIGGER_FUNCTION_TO_USE);
}

@Override
public void createRequiredDatabaseObjects() {
if(!StringUtils.isNullOrEmpty(sqliteFunctionToOverride)){
return;
}

String contextTableName = parameterService.getTablePrefix() + "_" + CONTEXT_TABLE_NAME;
try {
platform.getSqlTemplate().queryForInt("select count(*) from " + contextTableName);
Expand All @@ -68,26 +84,63 @@ public void dropRequiredDatabaseObjects() {

public void cleanDatabase() {
}

private void setSqliteFunctionResult(ISqlTransaction transaction, final String name, final String result){
JdbcSqlTransaction trans = (JdbcSqlTransaction)transaction;
trans.executeCallback(new IConnectionCallback<Object>() {
@Override
public Object execute(Connection con) throws SQLException {
org.sqlite.SQLiteConnection unwrapped = ((org.sqlite.SQLiteConnection)((org.apache.commons.dbcp.DelegatingConnection)con).getInnermostDelegate());

org.sqlite.Function.create(unwrapped, name, new org.sqlite.Function() {
@Override
protected void xFunc() throws SQLException {
this.result(result);
}
});

return null;
}
});
}


public void disableSyncTriggers(ISqlTransaction transaction, String nodeId) {
String contextTableName = parameterService.getTablePrefix() + "_" + CONTEXT_TABLE_NAME;
transaction.prepareAndExecute(String.format(CONTEXT_TABLE_INSERT, contextTableName), new Object[] {
SYNC_TRIGGERS_DISABLED_USER_VARIABLE, "1" });
if (nodeId != null) {
if(StringUtils.isNullOrEmpty(sqliteFunctionToOverride)){
String contextTableName = parameterService.getTablePrefix() + "_" + CONTEXT_TABLE_NAME;
transaction.prepareAndExecute(String.format(CONTEXT_TABLE_INSERT, contextTableName), new Object[] {
SYNC_TRIGGERS_DISABLED_NODE_VARIABLE, nodeId });
}
SYNC_TRIGGERS_DISABLED_USER_VARIABLE, "1" });
if (nodeId != null) {
transaction.prepareAndExecute(String.format(CONTEXT_TABLE_INSERT, contextTableName), new Object[] {
SYNC_TRIGGERS_DISABLED_NODE_VARIABLE, nodeId });
}
}else{
String node = "";
if(nodeId != null){
node = ":" + nodeId;
}

setSqliteFunctionResult(transaction, sqliteFunctionToOverride, "DISABLED" + node);
}

}

public void enableSyncTriggers(ISqlTransaction transaction) {
String contextTableName = parameterService.getTablePrefix() + "_" + CONTEXT_TABLE_NAME;
transaction.prepareAndExecute("delete from " + contextTableName);
if(StringUtils.isNullOrEmpty(sqliteFunctionToOverride)){
String contextTableName = parameterService.getTablePrefix() + "_" + CONTEXT_TABLE_NAME;
transaction.prepareAndExecute("delete from " + contextTableName);
}else{
setSqliteFunctionResult(transaction, sqliteFunctionToOverride, "ENABLED");
}
}

public String getSyncTriggersExpression() {
String contextTableName = parameterService.getTablePrefix() + "_" + CONTEXT_TABLE_NAME;
return "(not exists (select context_value from "+contextTableName+" where id = 'sync_triggers_disabled'))";
if(StringUtils.isNullOrEmpty(sqliteFunctionToOverride)){
String contextTableName = parameterService.getTablePrefix() + "_" + CONTEXT_TABLE_NAME;
return "(not exists (select context_value from "+contextTableName+" where id = 'sync_triggers_disabled'))";
}else{
return "("+sqliteFunctionToOverride+"() not like 'DISABLED%')";
}
}


Expand Down
Expand Up @@ -22,14 +22,26 @@

import java.util.HashMap;

import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.AbstractSymmetricDialect;
import org.jumpmind.symmetric.db.AbstractTriggerTemplate;

import com.mysql.jdbc.StringUtils;

public class SqliteTriggerTemplate extends AbstractTriggerTemplate {

public SqliteTriggerTemplate(AbstractSymmetricDialect symmetricDialect) {
super(symmetricDialect);

String sqliteFunctionToOverride = symmetricDialect.getParameterService().getString(ParameterConstants.SQLITE_TRIGGER_FUNCTION_TO_USE);

String sourceNodeExpression;
if(StringUtils.isNullOrEmpty(sqliteFunctionToOverride)){
sourceNodeExpression = "(select context_value from $(prefixName)_context where id = 'sync_node_disabled')";
}else{
sourceNodeExpression = "(select substr(" + sqliteFunctionToOverride + "(), 10) from sqlite_master where " + sqliteFunctionToOverride + "() like 'DISABLED:%')";
}

// formatter:off
triggerConcatCharacter = "||";
newTriggerValue = "new";
Expand All @@ -52,7 +64,7 @@ public SqliteTriggerTemplate(AbstractSymmetricDialect symmetricDialect) {
+ " insert into $(defaultCatalog)$(prefixName)_data (table_name, event_type, trigger_hist_id, row_data, channel_id, transaction_id, source_node_id, external_data, create_time) \n"
+ " values( \n" + " '$(targetTableName)', \n" + " 'I', \n"
+ " $(triggerHistoryId), \n"
+ " $(columns), \n" + " $(channelExpression), null,(select context_value from $(prefixName)_context where id = 'sync_node_disabled'), \n"
+ " $(columns), \n" + " $(channelExpression), null," + sourceNodeExpression + ", \n"
+ " $(externalSelect), \n" + " strftime('%Y-%m-%d %H:%M:%f','now','localtime') \n" + " ); \n"
+ " $(custom_on_insert_text) \n"
+ "end");
Expand All @@ -67,7 +79,7 @@ public SqliteTriggerTemplate(AbstractSymmetricDialect symmetricDialect) {
+ " values( \n" + " '$(targetTableName)', \n" + " 'U', \n"
+ " $(triggerHistoryId), \n" + " $(oldKeys), \n"
+ " $(columns), \n" + " $(oldColumns), \n"
+ " $(channelExpression), null,(select context_value from $(prefixName)_context where id = 'sync_node_disabled'), \n" + " $(externalSelect), \n"
+ " $(channelExpression), null," + sourceNodeExpression + ", \n" + " $(externalSelect), \n"
+ " strftime('%Y-%m-%d %H:%M:%f','now','localtime') \n" + " ); \n"
+ " $(custom_on_insert_text) \n"
+ "end ");
Expand All @@ -81,7 +93,7 @@ public SqliteTriggerTemplate(AbstractSymmetricDialect symmetricDialect) {
+ " insert into $(defaultCatalog)$(prefixName)_data (table_name, event_type, trigger_hist_id, pk_data, old_data, channel_id, transaction_id, source_node_id, external_data, create_time) \n"
+ " values( \n" + " '$(targetTableName)', \n" + " 'D', \n"
+ " $(triggerHistoryId), \n" + " $(oldKeys), \n"
+ " $(oldColumns), \n" + " $(channelExpression), null,(select context_value from $(prefixName)_context where id = 'sync_node_disabled'), \n"
+ " $(oldColumns), \n" + " $(channelExpression), null," + sourceNodeExpression + ", \n"
+ " $(externalSelect), \n" + " strftime('%Y-%m-%d %H:%M:%f','now','localtime') \n" + " ); \n"
+ " $(custom_on_insert_text) \n"
+ "end");
Expand Down
Expand Up @@ -331,7 +331,7 @@ public Integer execute(Connection con) throws SQLException {
});
}

protected <T> T executeCallback(IConnectionCallback<T> callback) {
public <T> T executeCallback(IConnectionCallback<T> callback) {
try {
return callback.execute(this.connection);
} catch (SQLException ex) {
Expand Down

0 comments on commit 23de65e

Please sign in to comment.