diff --git a/.gitattributes b/.gitattributes
new file mode 100644
index 0000000000..2ca86c7c94
--- /dev/null
+++ b/.gitattributes
@@ -0,0 +1,2 @@
+*.csv text eol=lf
+
diff --git a/symmetric-assemble/build.gradle b/symmetric-assemble/build.gradle
index 23cb1aa67f..d772c25ca6 100644
--- a/symmetric-assemble/build.gradle
+++ b/symmetric-assemble/build.gradle
@@ -209,8 +209,8 @@ project(':symmetric-server') {
exclude group: 'com.fasterxml.jackson.core'
exclude group: 'commons-codec'
}
- provided "javax.servlet:servlet-api:2.5"
- provided "org.eclipse.jetty:jetty-server:$jettyVersion"
+ compile "javax.servlet:javax.servlet-api:$servletVersion"
+ provided "org.eclipse.jetty:jetty-annotations:$jettyVersion"
provided "org.eclipse.jetty:jetty-servlets:$jettyVersion"
provided "org.eclipse.jetty:jetty-webapp:$jettyVersion"
provided "org.eclipse.jetty:jetty-jmx:$jettyVersion"
@@ -323,4 +323,4 @@ task releaseSymmetric {
dependsOn publishDoc
}
-task wrapper(type: Wrapper) { gradleVersion = '2.2.1' }
+task wrapper(type: Wrapper) { gradleVersion = '2.7' }
diff --git a/symmetric-assemble/common.gradle b/symmetric-assemble/common.gradle
index 06c29e95cd..8b69446580 100644
--- a/symmetric-assemble/common.gradle
+++ b/symmetric-assemble/common.gradle
@@ -138,13 +138,13 @@ subprojects { subproject ->
mockitoVersion = '1.9.5'
powerMockVersion = '1.5.3'
mysqlVersion = '5.1.30'
- servletVersion = '3.0.1'
+ servletVersion = '3.1.0'
springVersion = '4.0.5.RELEASE'
jtdsVersion = '1.2.8'
bouncyCastleVersion = '140'
animalSnifferVersion = '1.10'
jnaVersion = '4.1.0'
- jettyVersion = '7.6.3.v20120416'
+ jettyVersion = '9.3.3.v20150827'
env = System.getenv()
}
diff --git a/symmetric-assemble/pom.xml b/symmetric-assemble/pom.xml
index 92968fe42a..25e8380185 100644
--- a/symmetric-assemble/pom.xml
+++ b/symmetric-assemble/pom.xml
@@ -102,27 +102,6 @@
-
- org.apache.db.torque
- torque-maven-plugin
- 3.3-RC2
-
- docbook
- symmetric
- ${docbook.build}
- ${basedir}/../symmetric-core/src/main/resources
- ${basedir}/src/torque
- false
-
-
-
- generate-sources
-
- documentation
-
-
-
-
diff --git a/symmetric-assemble/src/asciidoc/configuration/load-filters.ad b/symmetric-assemble/src/asciidoc/configuration/load-filters.ad
index 863727bff7..7fd9bae3a0 100644
--- a/symmetric-assemble/src/asciidoc/configuration/load-filters.ad
+++ b/symmetric-assemble/src/asciidoc/configuration/load-filters.ad
@@ -15,7 +15,7 @@ endif::pro[]
Filter Id:: The unique identifier for the load filter
Group Link:: The group link for with the load filter will be applied.
-Type:: The type of load filter. Today only Bean Shell and Java are supported ('BSH', 'Java'), but SQL scripts may be added in a future release.
+Type:: The type of load filter. Today only Bean Shell and Java are supported ('BSH', 'Java', 'SQL').
Target Table:: The table on the target which the load filter will execute when changes occur on it.
NOTE: Use the wildcard * to specify all tables configured through the group link. Partial table names in conjunction with a wildcard
diff --git a/symmetric-assemble/src/asciidoc/configuration/load-filters/scripts.ad b/symmetric-assemble/src/asciidoc/configuration/load-filters/scripts.ad
index 84430fb218..b76b023826 100644
--- a/symmetric-assemble/src/asciidoc/configuration/load-filters/scripts.ad
+++ b/symmetric-assemble/src/asciidoc/configuration/load-filters/scripts.ad
@@ -29,15 +29,15 @@ Handle Error Script:: A script to execute if data cannot be processed.
.Variables available within scripts
[cols="3,^1,^1,5"]
|===
-|Variable|BSH|JAVA|Description
-
-|engine|X||The Symmetric engine object.
-|COLUMN_NAME|X||The source values for the row being inserted, updated or deleted.
-|OLD_COLUMN_NAME|X||The old values for the row being inserted, updated or deleted.
-|context|X|X|The data context object for the data being inserted, updated or deleted. .
-|table|X|X|The table object for the table being inserted, updated or deleted.
-|data|X|X|The `CsvData` object for the data change.
-|error|X|X|`java.lang.Exception`
+|Variable|BSH|SQL|JAVA|Description
+
+|engine|X|||The Symmetric engine object.
+|COLUMN_NAME|X|X||The source values for the row being inserted, updated or deleted.
+|OLD_COLUMN_NAME|X|X||The old values for the row being inserted, updated or deleted.
+|context|X||X|The data context object for the data being inserted, updated or deleted. .
+|table|X||X|The table object for the table being inserted, updated or deleted.
+|data|X||X|The `CsvData` object for the data change.
+|error|X||X|`java.lang.Exception`
|===
diff --git a/symmetric-assemble/src/asciidoc/configuration/transforms.ad b/symmetric-assemble/src/asciidoc/configuration/transforms.ad
index b392e9f70b..bb1b0f922f 100644
--- a/symmetric-assemble/src/asciidoc/configuration/transforms.ad
+++ b/symmetric-assemble/src/asciidoc/configuration/transforms.ad
@@ -112,7 +112,7 @@ ifndef::pro[]
----
insert into SYM_TRANSFORM_TABLE (
transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name,
- target_table_name, delete_action, transform_order, column_policy, update_first,
+ target_table_name, update_action, delete_action, transform_order, column_policy, update_first,
last_update_by, last_update_time, create_time
) values (
'itemSellingPriceTransform', 'store', 'corp', 'EXTRACT', 'ITEM_SELLING_PRICE',
diff --git a/symmetric-assemble/src/asciidoc/configuration/transforms/operation-change.ad b/symmetric-assemble/src/asciidoc/configuration/transforms/operation-change.ad
index b422c152f6..29a93878be 100644
--- a/symmetric-assemble/src/asciidoc/configuration/transforms/operation-change.ad
+++ b/symmetric-assemble/src/asciidoc/configuration/transforms/operation-change.ad
@@ -26,7 +26,7 @@ ifndef::pro[]
----
insert into SYM_TRANSFORM_TABLE (
transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name,
- target_table_name, delete_action, transform_order, column_policy, update_first,
+ target_table_name, update_action, delete_action, transform_order, column_policy, update_first,
last_update_time, create_time
) values (
'update-first', 'store', 'corp', 'EXTRACT', 'ITEM_SELLING_PRICE',
@@ -48,7 +48,7 @@ ifndef::pro[]
----
insert into SYM_TRANSFORM_TABLE (
transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name,
- target_table_name, delete_action, transform_order, column_policy, update_first,
+ target_table_name, update_action, delete_action, transform_order, column_policy, update_first,
last_update_time, create_time
) values (
'delete-action-update-col', 'store', 'corp', 'EXTRACT', 'ITEM_SELLING_PRICE',
diff --git a/symmetric-assemble/src/asciidoc/configuration/transforms/types.ad b/symmetric-assemble/src/asciidoc/configuration/transforms/types.ad
index 8167126c43..9dbbe68840 100644
--- a/symmetric-assemble/src/asciidoc/configuration/transforms/types.ad
+++ b/symmetric-assemble/src/asciidoc/configuration/transforms/types.ad
@@ -198,6 +198,8 @@ This transform copies the left most number of bytes specified.
This transformation determines the target column value by using a query, contained in transform expression
to lookup the value in another table. The query must return a single row, and the first column of the query
is used as the value. Your query references source column names by prefixing with a colon (e.g., :MY_COLUMN).
+Additional you can reference old values with :OLD_COLUMN and previously transformed columns (see transform order) with
+:TRM_COLUMN.
ifndef::pro[]
[source, SQL]
diff --git a/symmetric-assemble/src/asciidoc/configuration/transforms/virtual-columns.ad b/symmetric-assemble/src/asciidoc/configuration/transforms/virtual-columns.ad
index 1e73c5fe7c..ab32993e41 100644
--- a/symmetric-assemble/src/asciidoc/configuration/transforms/virtual-columns.ad
+++ b/symmetric-assemble/src/asciidoc/configuration/transforms/virtual-columns.ad
@@ -22,7 +22,7 @@ ifndef::pro[]
----
insert into SYM_TRANSFORM_TABLE (
transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name,
- target_table_name, delete_action, transform_order, column_policy, update_first,
+ target_table_name, update_action, delete_action, transform_order, column_policy, update_first,
last_update_by, last_update_time, create_time
) values (
'extractStoreItemSellingPriceTransform', 'store', 'corp', 'EXTRACT', 'ITEM_SELLING_PRICE',
@@ -35,7 +35,7 @@ insert into SYM_TRANSFORM_TABLE (
----
insert into SYM_TRANSFORM_TABLE (
transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name,
- target_table_name, delete_action, transform_order, column_policy, update_first,
+ target_table_name, update_action, delete_action, transform_order, column_policy, update_first,
last_update_by, last_update_time, create_time
) values (
'loadCorpItemSellingPriceTransform', 'corp', 'store', 'LOAD', 'ITEM_SELLING_PRICE',
diff --git a/symmetric-assemble/src/docbook/configuration.xml b/symmetric-assemble/src/docbook/configuration.xml
index 2c3ad840ae..4bc3d4b1f3 100644
--- a/symmetric-assemble/src/docbook/configuration.xml
+++ b/symmetric-assemble/src/docbook/configuration.xml
@@ -1242,6 +1242,27 @@ column.
+
+update_action: When a source operation of Update takes place, there are
+three possible ways to handle the transformation at the target. The
+options include:
+
+NONE - The update results in no target changes.
+
+DEL_ROW - The update results in a delete of the row
+as specified by the pk columns defined in the transformation
+configuration.
+
+UPDATE_COL - The update results in an Update
+operation on the target which updates the specific rows and columns
+based on the defined transformation.
+
+BeanShell Script Transform ('bsh'):
+ script code which returns one of the above items.
+ you can use COLUMN variables inside the script.
+
+
+
delete_action: When a source operation of Delete takes place, there are
three possible ways to handle the transformation at the target. The
diff --git a/symmetric-client/pom.xml b/symmetric-client/pom.xml
index 995f11b2c9..f70692a19c 100644
--- a/symmetric-client/pom.xml
+++ b/symmetric-client/pom.xml
@@ -135,11 +135,18 @@
postgresql
provided
+
+
+ com.oracle
+ ojdbc
+ provided
+
net.sourceforge.jtds
jtds
diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/JdbcSymmetricDialectFactory.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/JdbcSymmetricDialectFactory.java
index 70108e7e68..52712b4610 100644
--- a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/JdbcSymmetricDialectFactory.java
+++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/JdbcSymmetricDialectFactory.java
@@ -66,7 +66,7 @@
import org.jumpmind.symmetric.db.postgresql.PostgreSqlSymmetricDialect;
import org.jumpmind.symmetric.db.redshift.RedshiftSymmetricDialect;
import org.jumpmind.symmetric.db.sqlanywhere.SqlAnywhereSymmetricDialect;
-import org.jumpmind.symmetric.db.sqlite.SqliteSymmetricDialect;
+import org.jumpmind.symmetric.db.sqlite.SqliteJdbcSymmetricDialect;
import org.jumpmind.symmetric.service.IParameterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -149,7 +149,7 @@ public ISymmetricDialect create() {
} else if (platform instanceof InterbaseDatabasePlatform) {
dialect = new InterbaseSymmetricDialect(parameterService, platform);
} else if (platform instanceof SqliteDatabasePlatform) {
- dialect = new SqliteSymmetricDialect(parameterService, platform);
+ dialect = new SqliteJdbcSymmetricDialect(parameterService, platform);
} else {
throw new DbNotSupportedException();
}
diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/sqlite/SqliteJdbcSymmetricDialect.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/sqlite/SqliteJdbcSymmetricDialect.java
new file mode 100644
index 0000000000..d1c5f70749
--- /dev/null
+++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/sqlite/SqliteJdbcSymmetricDialect.java
@@ -0,0 +1,38 @@
+package org.jumpmind.symmetric.db.sqlite;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import org.jumpmind.db.platform.IDatabasePlatform;
+import org.jumpmind.db.sql.IConnectionCallback;
+import org.jumpmind.db.sql.ISqlTransaction;
+import org.jumpmind.db.sql.JdbcSqlTransaction;
+import org.jumpmind.symmetric.service.IParameterService;
+
+public class SqliteJdbcSymmetricDialect extends SqliteSymmetricDialect {
+
+ public SqliteJdbcSymmetricDialect(IParameterService parameterService, IDatabasePlatform platform) {
+ super(parameterService, platform);
+ }
+
+ @Override
+ protected void setSqliteFunctionResult(ISqlTransaction transaction, final String name, final String result){
+ JdbcSqlTransaction trans = (JdbcSqlTransaction)transaction;
+ trans.executeCallback(new IConnectionCallback() {
+ @Override
+ public Object execute(Connection con) throws SQLException {
+ org.sqlite.SQLiteConnection unwrapped = ((org.sqlite.SQLiteConnection)((org.apache.commons.dbcp.DelegatingConnection)con).getInnermostDelegate());
+
+ org.sqlite.Function.create(unwrapped, name, new org.sqlite.Function() {
+ @Override
+ protected void xFunc() throws SQLException {
+ this.result(result);
+ }
+ });
+
+ return null;
+ }
+ });
+ }
+
+}
diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/ext/OracleBulkDataLoaderFactory.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/ext/OracleBulkDataLoaderFactory.java
index e214933c23..e21a1e463d 100644
--- a/symmetric-client/src/main/java/org/jumpmind/symmetric/ext/OracleBulkDataLoaderFactory.java
+++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/ext/OracleBulkDataLoaderFactory.java
@@ -28,7 +28,6 @@
import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.db.ISymmetricDialect;
-import org.jumpmind.symmetric.ext.ISymmetricEngineAware;
import org.jumpmind.symmetric.io.OracleBulkDatabaseWriter;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.Conflict;
@@ -36,10 +35,10 @@
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.data.writer.ResolvedData;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
-import org.jumpmind.symmetric.load.IDataLoaderFactory;
+import org.jumpmind.symmetric.load.DefaultDataLoaderFactory;
import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor;
-public class OracleBulkDataLoaderFactory implements IDataLoaderFactory, ISymmetricEngineAware,
+public class OracleBulkDataLoaderFactory extends DefaultDataLoaderFactory implements ISymmetricEngineAware,
IBuiltInExtensionPoint {
private ISymmetricEngine engine;
@@ -60,11 +59,12 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri
int maxRowsBeforeFlush = engine.getParameterService().getInt(
"oracle.bulk.load.max.rows.before.flush", 1000);
return new OracleBulkDatabaseWriter(symmetricDialect.getPlatform(), engine.getTablePrefix(),
- jdbcExtractor, maxRowsBeforeFlush);
+ jdbcExtractor, maxRowsBeforeFlush, buildDatabaseWriterSettings(filters, errorHandlers, conflictSettings, resolvedData));
}
public void setSymmetricEngine(ISymmetricEngine engine) {
this.engine = engine;
+ this.parameterService = engine.getParameterService();
}
public boolean isPlatformSupported(IDatabasePlatform platform) {
diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/OracleBulkDatabaseWriter.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/OracleBulkDatabaseWriter.java
index 8dbca2688a..3f2f94b051 100644
--- a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/OracleBulkDatabaseWriter.java
+++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/OracleBulkDatabaseWriter.java
@@ -42,6 +42,7 @@
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
+import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter;
import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor;
@@ -58,8 +59,8 @@ public class OracleBulkDatabaseWriter extends DefaultDatabaseWriter {
protected List> rowArrays = new ArrayList>();
public OracleBulkDatabaseWriter(IDatabasePlatform platform, String procedurePrefix,
- NativeJdbcExtractor jdbcExtractor, int maxRowsBeforeFlush) {
- super(platform);
+ NativeJdbcExtractor jdbcExtractor, int maxRowsBeforeFlush, DatabaseWriterSettings settings) {
+ super(platform, settings);
this.procedurePrefix = procedurePrefix;
this.jdbcExtractor = jdbcExtractor;
this.maxRowsBeforeFlush = maxRowsBeforeFlush;
@@ -89,23 +90,26 @@ public void write(CsvData data) {
case INSERT:
statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER);
- Object[] rowData = platform.getObjectValues(batch.getBinaryEncoding(),
- getRowData(data, CsvData.ROW_DATA), targetTable.getColumns());
- for (int i = 0; i < rowData.length; i++) {
-
- List columnList = null;
- if (rowArrays.size() > i) {
- columnList = rowArrays.get(i);
- } else {
- columnList = new ArrayList();
- rowArrays.add(columnList);
+ if (filterBefore(data)) {
+ Object[] rowData = platform.getObjectValues(batch.getBinaryEncoding(), getRowData(data, CsvData.ROW_DATA),
+ targetTable.getColumns());
+ for (int i = 0; i < rowData.length; i++) {
+
+ List columnList = null;
+ if (rowArrays.size() > i) {
+ columnList = rowArrays.get(i);
+ } else {
+ columnList = new ArrayList();
+ rowArrays.add(columnList);
+ }
+ columnList.add(rowData[i]);
+
+ if (columnList.size() >= maxRowsBeforeFlush) {
+ requiresFlush = true;
+ }
}
- columnList.add(rowData[i]);
-
- if (columnList.size() >= maxRowsBeforeFlush) {
- requiresFlush = true;
- }
- }
+ uncommittedCount++;
+ }
break;
case UPDATE:
super.write(data);
@@ -121,6 +125,8 @@ public void write(CsvData data) {
if (requiresFlush) {
flush();
}
+
+ checkForEarlyCommit();
}
protected void flush() {
diff --git a/symmetric-client/src/test/java/org/jumpmind/symmetric/DbExportImportTest.java b/symmetric-client/src/test/java/org/jumpmind/symmetric/DbExportImportTest.java
index 329a6bd4f5..a0e7a6498f 100644
--- a/symmetric-client/src/test/java/org/jumpmind/symmetric/DbExportImportTest.java
+++ b/symmetric-client/src/test/java/org/jumpmind/symmetric/DbExportImportTest.java
@@ -148,7 +148,7 @@ public void exportTestDatabaseSQL() throws Exception {
String output = export.exportTables(tables).toLowerCase();
Assert.assertEquals(output, 42, StringUtils.countMatches(output, "create table \"sym_"));
- final int EXPECTED_VARCHAR_MAX = engine.getDatabasePlatform().getName().equals(DatabaseNamesConstants.SQLITE) ? 264 : 43;
+ final int EXPECTED_VARCHAR_MAX = engine.getDatabasePlatform().getName().equals(DatabaseNamesConstants.SQLITE) ? 265 : 43;
final String EXPECTED_STRING = "varchar(" + Integer.MAX_VALUE + ")";
Assert.assertEquals("Expected " + EXPECTED_VARCHAR_MAX + " " + EXPECTED_STRING
+ " in the following output: " + output, EXPECTED_VARCHAR_MAX,
diff --git a/symmetric-client/src/test/java/org/jumpmind/symmetric/io/OracleBulkDatabaseWriterTest.java b/symmetric-client/src/test/java/org/jumpmind/symmetric/io/OracleBulkDatabaseWriterTest.java
index cd8634f1f5..bdaad2a05d 100644
--- a/symmetric-client/src/test/java/org/jumpmind/symmetric/io/OracleBulkDatabaseWriterTest.java
+++ b/symmetric-client/src/test/java/org/jumpmind/symmetric/io/OracleBulkDatabaseWriterTest.java
@@ -23,20 +23,18 @@
import java.util.ArrayList;
import java.util.List;
-import junit.framework.Assert;
-
import org.jumpmind.db.DbTestUtils;
import org.jumpmind.db.platform.oracle.OracleDatabasePlatform;
import org.jumpmind.db.util.BasicDataSourcePropertyConstants;
-import org.jumpmind.symmetric.io.OracleBulkDatabaseWriter;
-import org.jumpmind.symmetric.io.AbstractWriterTest.TableCsvData;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataEventType;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor;
+
public class OracleBulkDatabaseWriterTest extends AbstractWriterTest {
@BeforeClass
@@ -58,7 +56,7 @@ public void setupTest() {
@Override
protected long writeData(TableCsvData... datas) {
return writeData(new OracleBulkDatabaseWriter(platform, "sym",
- new CommonsDbcpNativeJdbcExtractor(), 1000), datas);
+ new CommonsDbcpNativeJdbcExtractor(), 1000, null), datas);
}
@Override
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java
index a4d2d8d099..ea79e32b78 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java
@@ -29,10 +29,11 @@
*/
final public class Constants {
- private Constants() {
+ private Constants() {
+
}
-
- public static final String SYSTEM_USER = "system";
+
+ public static final String SYSTEM_USER = "system";
public static final long VIRTUAL_BATCH_FOR_REGISTRATION = IoConstants.IGNORE_TABLES_BATCH;
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java
index 4bdb80bca8..12b7582c3d 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java
@@ -290,8 +290,11 @@ private ParameterConstants() {
public final static String MSSQL_ROW_LEVEL_LOCKS_ONLY = "mssql.allow.only.row.level.locks.on.runtime.tables";
public final static String MSSQL_USE_NTYPES_FOR_SYNC = "mssql.use.ntypes.for.sync";
+
public final static String MSSQL_TRIGGER_EXECUTE_AS = "mssql.trigger.execute.as";
+
+ public final static String SQLITE_TRIGGER_FUNCTION_TO_USE = "sqlite.trigger.function.to.use";
public final static String EXTENSIONS_XML = "extensions.xml";
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/sqlite/SqliteSymmetricDialect.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/sqlite/SqliteSymmetricDialect.java
index 8f69295ba6..a78bb18d8c 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/sqlite/SqliteSymmetricDialect.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/sqlite/SqliteSymmetricDialect.java
@@ -25,10 +25,13 @@
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.SqlException;
import org.jumpmind.db.util.BinaryEncoding;
+import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.AbstractSymmetricDialect;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.util.AppUtils;
+import com.mysql.jdbc.StringUtils;
+
public class SqliteSymmetricDialect extends AbstractSymmetricDialect {
public static final String CONTEXT_TABLE_NAME = "context";
@@ -41,13 +44,21 @@ public class SqliteSymmetricDialect extends AbstractSymmetricDialect {
static final String SYNC_TRIGGERS_DISABLED_USER_VARIABLE = "sync_triggers_disabled";
static final String SYNC_TRIGGERS_DISABLED_NODE_VARIABLE = "sync_node_disabled";
+ String sqliteFunctionToOverride;
+
public SqliteSymmetricDialect(IParameterService parameterService, IDatabasePlatform platform) {
super(parameterService, platform);
this.triggerTemplate = new SqliteTriggerTemplate(this);
+
+ sqliteFunctionToOverride = parameterService.getString(ParameterConstants.SQLITE_TRIGGER_FUNCTION_TO_USE);
}
@Override
public void createRequiredDatabaseObjects() {
+ if(!StringUtils.isNullOrEmpty(sqliteFunctionToOverride)){
+ return;
+ }
+
String contextTableName = parameterService.getTablePrefix() + "_" + CONTEXT_TABLE_NAME;
try {
platform.getSqlTemplate().queryForInt("select count(*) from " + contextTableName);
@@ -68,26 +79,47 @@ public void dropRequiredDatabaseObjects() {
public void cleanDatabase() {
}
-
+
+ protected void setSqliteFunctionResult(ISqlTransaction transaction, final String name, final String result) {
+
+ }
public void disableSyncTriggers(ISqlTransaction transaction, String nodeId) {
- String contextTableName = parameterService.getTablePrefix() + "_" + CONTEXT_TABLE_NAME;
- transaction.prepareAndExecute(String.format(CONTEXT_TABLE_INSERT, contextTableName), new Object[] {
- SYNC_TRIGGERS_DISABLED_USER_VARIABLE, "1" });
- if (nodeId != null) {
+ if(StringUtils.isNullOrEmpty(sqliteFunctionToOverride)){
+ String contextTableName = parameterService.getTablePrefix() + "_" + CONTEXT_TABLE_NAME;
transaction.prepareAndExecute(String.format(CONTEXT_TABLE_INSERT, contextTableName), new Object[] {
- SYNC_TRIGGERS_DISABLED_NODE_VARIABLE, nodeId });
- }
+ SYNC_TRIGGERS_DISABLED_USER_VARIABLE, "1" });
+ if (nodeId != null) {
+ transaction.prepareAndExecute(String.format(CONTEXT_TABLE_INSERT, contextTableName), new Object[] {
+ SYNC_TRIGGERS_DISABLED_NODE_VARIABLE, nodeId });
+ }
+ }else{
+ String node = "";
+ if(nodeId != null){
+ node = ":" + nodeId;
+ }
+
+ setSqliteFunctionResult(transaction, sqliteFunctionToOverride, "DISABLED" + node);
+ }
+
}
public void enableSyncTriggers(ISqlTransaction transaction) {
- String contextTableName = parameterService.getTablePrefix() + "_" + CONTEXT_TABLE_NAME;
- transaction.prepareAndExecute("delete from " + contextTableName);
+ if(StringUtils.isNullOrEmpty(sqliteFunctionToOverride)){
+ String contextTableName = parameterService.getTablePrefix() + "_" + CONTEXT_TABLE_NAME;
+ transaction.prepareAndExecute("delete from " + contextTableName);
+ }else{
+ setSqliteFunctionResult(transaction, sqliteFunctionToOverride, "ENABLED");
+ }
}
public String getSyncTriggersExpression() {
- String contextTableName = parameterService.getTablePrefix() + "_" + CONTEXT_TABLE_NAME;
- return "(not exists (select context_value from "+contextTableName+" where id = 'sync_triggers_disabled'))";
+ if(StringUtils.isNullOrEmpty(sqliteFunctionToOverride)){
+ String contextTableName = parameterService.getTablePrefix() + "_" + CONTEXT_TABLE_NAME;
+ return "(not exists (select context_value from "+contextTableName+" where id = 'sync_triggers_disabled'))";
+ }else{
+ return "("+sqliteFunctionToOverride+"() not like 'DISABLED%')";
+ }
}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/sqlite/SqliteTriggerTemplate.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/sqlite/SqliteTriggerTemplate.java
index d008a9c71d..8169e5cbba 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/sqlite/SqliteTriggerTemplate.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/sqlite/SqliteTriggerTemplate.java
@@ -22,14 +22,26 @@
import java.util.HashMap;
+import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.AbstractSymmetricDialect;
import org.jumpmind.symmetric.db.AbstractTriggerTemplate;
+import com.mysql.jdbc.StringUtils;
+
public class SqliteTriggerTemplate extends AbstractTriggerTemplate {
public SqliteTriggerTemplate(AbstractSymmetricDialect symmetricDialect) {
super(symmetricDialect);
+
+ String sqliteFunctionToOverride = symmetricDialect.getParameterService().getString(ParameterConstants.SQLITE_TRIGGER_FUNCTION_TO_USE);
+ String sourceNodeExpression;
+ if(StringUtils.isNullOrEmpty(sqliteFunctionToOverride)){
+ sourceNodeExpression = "(select context_value from $(prefixName)_context where id = 'sync_node_disabled')";
+ }else{
+ sourceNodeExpression = "(select substr(" + sqliteFunctionToOverride + "(), 10) from sqlite_master where " + sqliteFunctionToOverride + "() like 'DISABLED:%')";
+ }
+
// formatter:off
triggerConcatCharacter = "||";
newTriggerValue = "new";
@@ -52,7 +64,7 @@ public SqliteTriggerTemplate(AbstractSymmetricDialect symmetricDialect) {
+ " insert into $(defaultCatalog)$(prefixName)_data (table_name, event_type, trigger_hist_id, row_data, channel_id, transaction_id, source_node_id, external_data, create_time) \n"
+ " values( \n" + " '$(targetTableName)', \n" + " 'I', \n"
+ " $(triggerHistoryId), \n"
- + " $(columns), \n" + " $(channelExpression), null,(select context_value from $(prefixName)_context where id = 'sync_node_disabled'), \n"
+ + " $(columns), \n" + " $(channelExpression), null," + sourceNodeExpression + ", \n"
+ " $(externalSelect), \n" + " strftime('%Y-%m-%d %H:%M:%f','now','localtime') \n" + " ); \n"
+ " $(custom_on_insert_text) \n"
+ "end");
@@ -67,7 +79,7 @@ public SqliteTriggerTemplate(AbstractSymmetricDialect symmetricDialect) {
+ " values( \n" + " '$(targetTableName)', \n" + " 'U', \n"
+ " $(triggerHistoryId), \n" + " $(oldKeys), \n"
+ " $(columns), \n" + " $(oldColumns), \n"
- + " $(channelExpression), null,(select context_value from $(prefixName)_context where id = 'sync_node_disabled'), \n" + " $(externalSelect), \n"
+ + " $(channelExpression), null," + sourceNodeExpression + ", \n" + " $(externalSelect), \n"
+ " strftime('%Y-%m-%d %H:%M:%f','now','localtime') \n" + " ); \n"
+ " $(custom_on_insert_text) \n"
+ "end ");
@@ -81,7 +93,7 @@ public SqliteTriggerTemplate(AbstractSymmetricDialect symmetricDialect) {
+ " insert into $(defaultCatalog)$(prefixName)_data (table_name, event_type, trigger_hist_id, pk_data, old_data, channel_id, transaction_id, source_node_id, external_data, create_time) \n"
+ " values( \n" + " '$(targetTableName)', \n" + " 'D', \n"
+ " $(triggerHistoryId), \n" + " $(oldKeys), \n"
- + " $(oldColumns), \n" + " $(channelExpression), null,(select context_value from $(prefixName)_context where id = 'sync_node_disabled'), \n"
+ + " $(oldColumns), \n" + " $(channelExpression), null," + sourceNodeExpression + ", \n"
+ " $(externalSelect), \n" + " strftime('%Y-%m-%d %H:%M:%f','now','localtime') \n" + " ); \n"
+ " $(custom_on_insert_text) \n"
+ "end");
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/data/transform/LookupColumnTransform.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/data/transform/LookupColumnTransform.java
index d673659342..7140599f37 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/data/transform/LookupColumnTransform.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/data/transform/LookupColumnTransform.java
@@ -69,12 +69,21 @@ public String transform(IDatabasePlatform platform, DataContext context,
if (StringUtils.isNotBlank(sql)) {
ISqlTransaction transaction = context.findTransaction();
List values = null;
+ LinkedCaseInsensitiveMap namedParams = new LinkedCaseInsensitiveMap(sourceValues);
+ if (data.getOldSourceValues() != null && sql.contains(":OLD_")) {
+ for (Map.Entry oldColumn : data.getOldSourceValues().entrySet()) {
+ namedParams.put("OLD_" + oldColumn.getKey().toUpperCase(), oldColumn.getValue());
+ }
+ }
+ if (data.getTargetValues() != null && sql.contains(":TRM_")) {
+ for (Map.Entry transformedCol : data.getTargetValues().entrySet()) {
+ namedParams.put("TRM_" + transformedCol.getKey().toUpperCase(), transformedCol.getValue());
+ }
+ }
if (transaction != null) {
- values = transaction.query(sql, lookupColumnRowMapper, new LinkedCaseInsensitiveMap(
- sourceValues));
+ values = transaction.query(sql, lookupColumnRowMapper, namedParams);
} else {
- values = platform.getSqlTemplate().query(sql, lookupColumnRowMapper,
- new LinkedCaseInsensitiveMap(sourceValues));
+ values = platform.getSqlTemplate().query(sql, lookupColumnRowMapper, namedParams);
}
int rowCount = values.size();
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DynamicDatabaseWriterFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DynamicDatabaseWriterFilter.java
index 97684f9e37..186e866d9e 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DynamicDatabaseWriterFilter.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DynamicDatabaseWriterFilter.java
@@ -77,6 +77,8 @@ public static List getDatabaseWriterFilters(ISymmet
databaseWriterFilters.add(new BshDatabaseWriterFilter(engine, entry.getValue()));
} else if (entry.getKey().equals(LoadFilterType.JAVA)) {
databaseWriterFilters.add(new JavaDatabaseWriterFilter(engine, entry.getValue()));
+ } else if (entry.getKey().equals(LoadFilterType.SQL)) {
+ databaseWriterFilters.add(new SQLDatabaseWriterFilter(engine, entry.getValue()));
}
}
}
@@ -123,28 +125,21 @@ protected boolean processLoadFilters(DataContext context, Table table, CsvData d
foundFilters = lookupFilters(foundFilters,
table.getCatalog(), FormatUtils.WILDCARD, FormatUtils.WILDCARD);
- foundFilters = lookupFilters(foundFilters,
+ foundFilters = lookupFilters(foundFilters,
FormatUtils.WILDCARD, FormatUtils.WILDCARD, FormatUtils.WILDCARD);
}
- String tableName = null;
- if (isIgnoreCase()) {
- tableName = table.getName().toUpperCase();
- } else {
- tableName = table.getName();
- }
+ foundFilters = lookupFilters(foundFilters,
+ FormatUtils.WILDCARD, FormatUtils.WILDCARD, table.getName());
foundFilters = lookupFilters(foundFilters,
- FormatUtils.WILDCARD, FormatUtils.WILDCARD, tableName);
-
- foundFilters = lookupFilters(foundFilters,
- FormatUtils.WILDCARD, table.getSchema(), tableName);
+ FormatUtils.WILDCARD, table.getSchema(), table.getName());
foundFilters = lookupFilters(foundFilters,
- table.getCatalog(), FormatUtils.WILDCARD, tableName);
+ table.getCatalog(), FormatUtils.WILDCARD, table.getName());
foundFilters = lookupFilters(foundFilters,
- table.getCatalog(), table.getSchema(), tableName);
+ table.getCatalog(), table.getSchema(), table.getName());
if (foundFilters != null) {
for (LoadFilter filter : foundFilters) {
@@ -159,8 +154,12 @@ protected boolean processLoadFilters(DataContext context, Table table, CsvData d
}
private List lookupFilters(List foundFilters, String catalogName, String schemaName, String tableName) {
- List filters = loadFilters.get(Table.getFullyQualifiedTableName(catalogName,
- schemaName, tableName));
+ String fullyQualifiedTableName = Table.getFullyQualifiedTableName(catalogName, schemaName, tableName);
+ if (isIgnoreCase()) {
+ fullyQualifiedTableName = fullyQualifiedTableName.toUpperCase();
+ }
+ List filters = loadFilters.get(
+ fullyQualifiedTableName);
if (filters != null) {
if (foundFilters == null) {
foundFilters = new ArrayList();
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/SQLDatabaseWriterFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/SQLDatabaseWriterFilter.java
new file mode 100644
index 0000000000..b73ad42a85
--- /dev/null
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/SQLDatabaseWriterFilter.java
@@ -0,0 +1,186 @@
+package org.jumpmind.symmetric.load;
+
+import static org.apache.commons.lang.StringUtils.isNotBlank;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.jumpmind.db.model.Table;
+import org.jumpmind.db.sql.ISqlRowMapper;
+import org.jumpmind.db.sql.ISqlTransaction;
+import org.jumpmind.db.sql.Row;
+import org.jumpmind.symmetric.ISymmetricEngine;
+import org.jumpmind.symmetric.SymmetricException;
+import org.jumpmind.symmetric.common.Constants;
+import org.jumpmind.symmetric.io.data.CsvData;
+import org.jumpmind.symmetric.io.data.DataContext;
+import org.jumpmind.symmetric.io.data.DataEventType;
+import org.jumpmind.symmetric.model.Data;
+import org.jumpmind.symmetric.model.LoadFilter;
+import org.jumpmind.util.FormatUtils;
+import org.jumpmind.util.LinkedCaseInsensitiveMap;
+
+import bsh.TargetError;
+
+/**
+ * User: Markus Schulz
+ * Date: 24.08.15
+ * Time: 10:53
+ */
+public class SQLDatabaseWriterFilter extends DynamicDatabaseWriterFilter {
+
+ protected static final ISqlRowMapper lookupColumnRowMapper = new ISqlRowMapper() {
+ @Override
+ public Boolean mapRow(Row row) {
+ boolean result = false;
+ Iterator i = row.values().iterator();
+ if (i.hasNext()) {
+ Object val = i.next();
+ if (val instanceof Boolean) {
+ result = (Boolean)val;
+ } else if (val instanceof Number) {
+ result = ((Number)val).intValue() > 0;
+ } else if (val instanceof String) {
+ result = !((String)val).equals("0") && !((String)val).equalsIgnoreCase("false");
+ }
+ }
+ return result;
+ }
+ };
+
+ private static final String OLD_ = "OLD_";
+
+ public SQLDatabaseWriterFilter(ISymmetricEngine engine, Map> loadFilters) {
+ super(engine, loadFilters);
+ }
+
+ @Override
+ protected boolean processLoadFilters(DataContext context, Table table, CsvData data, Exception error,
+ WriteMethod writeMethod, List loadFiltersForTable) {
+
+ boolean writeRow = true;
+ LoadFilter currentFilter = null;
+ List values = null;
+ try {
+ LinkedCaseInsensitiveMap namedParams = null;
+ for (LoadFilter filter : loadFiltersForTable) {
+ currentFilter = filter;
+ values = null;
+ if (filter.isFilterOnDelete() && data.getDataEventType().equals(DataEventType.DELETE)
+ || filter.isFilterOnInsert() && data.getDataEventType().equals(DataEventType.INSERT)
+ || filter.isFilterOnUpdate() && data.getDataEventType().equals(DataEventType.UPDATE)) {
+ String sql = null;
+ if (writeMethod.equals(WriteMethod.BEFORE_WRITE) && filter.getBeforeWriteScript() != null) {
+ sql = doTokenReplacementOnSql(context, filter.getBeforeWriteScript());
+ }
+ else if (writeMethod.equals(WriteMethod.AFTER_WRITE) && filter.getAfterWriteScript() != null) {
+ sql = doTokenReplacementOnSql(context, filter.getAfterWriteScript());
+ }
+ else if (writeMethod.equals(WriteMethod.HANDLE_ERROR) && filter.getHandleErrorScript() != null) {
+ sql = doTokenReplacementOnSql(context, filter.getHandleErrorScript());
+ }
+ if (sql != null && !sql.trim().isEmpty()) {
+ if (namedParams == null) {
+ namedParams = getVariablesMap(table, data);
+ }
+ ISqlTransaction transaction = context.findTransaction();
+ values = transaction.query(sql, lookupColumnRowMapper, namedParams);
+ }
+
+ if (values != null && values.size() > 0) {
+ writeRow = values.get(0);
+ }
+ }
+ }
+ }
+ catch (Exception ex) {
+ processError(currentFilter, table, ex);
+ }
+ return writeRow;
+ }
+
+ private LinkedCaseInsensitiveMap getVariablesMap(Table table, CsvData data) {
+ LinkedCaseInsensitiveMap namedParams = new LinkedCaseInsensitiveMap();
+ if (data != null) {
+ Map sourceValues = data.toColumnNameValuePairs(table.getColumnNames(), CsvData.ROW_DATA);
+ if (sourceValues.size() > 0) {
+ for (String columnName : sourceValues.keySet()) {
+ namedParams.put(columnName, sourceValues.get(columnName));
+ namedParams.put(columnName.toUpperCase(), sourceValues.get(columnName));
+ }
+ }
+ else {
+ Map pkValues = data.toColumnNameValuePairs(
+ table.getPrimaryKeyColumnNames(), CsvData.PK_DATA);
+ for (String columnName : pkValues.keySet()) {
+ namedParams.put(columnName, pkValues.get(columnName));
+ namedParams.put(columnName.toUpperCase(), pkValues.get(columnName));
+ }
+ }
+
+ Map oldValues = data.toColumnNameValuePairs(table.getColumnNames(),
+ CsvData.OLD_DATA);
+ for (String columnName : oldValues.keySet()) {
+ namedParams.put(OLD_ + columnName, sourceValues.get(columnName));
+ namedParams.put(OLD_ + columnName.toUpperCase(), sourceValues.get(columnName));
+ }
+ }
+ return namedParams;
+ }
+
+ @Override
+ protected void executeScripts(DataContext context, String key, Set scripts, boolean isFailOnError) {
+ if (scripts != null) {
+ try {
+ ISqlTransaction transaction = context.findTransaction();
+ for (String script : scripts) {
+ String sql = doTokenReplacementOnSql(context, script);
+ transaction.query(sql, lookupColumnRowMapper, null);
+ }
+ }
+ catch (Exception e) {
+ if (isFailOnError) {
+ throw (RuntimeException) e;
+ }
+ else {
+ log.error("Failed while executing sql script", e);
+ }
+ }
+ }
+ }
+
+ protected String doTokenReplacementOnSql(DataContext context, String sql) {
+ if (isNotBlank(sql)) {
+ Data csvData = (Data) context.get(Constants.DATA_CONTEXT_CURRENT_CSV_DATA);
+
+ if (csvData != null && csvData.getTriggerHistory() != null) {
+ sql = FormatUtils
+ .replaceToken(sql, "sourceCatalogName", csvData.getTriggerHistory().getSourceCatalogName(), true);
+ }
+
+ if (csvData != null && csvData.getTriggerHistory() != null) {
+ sql = FormatUtils
+ .replaceToken(sql, "sourceSchemaName", csvData.getTriggerHistory().getSourceSchemaName(), true);
+ }
+ }
+ return sql;
+ }
+
+
+ protected void processError(LoadFilter currentFilter, Table table, Throwable ex) {
+ if (ex instanceof TargetError) {
+ ex = ((TargetError) ex).getTarget();
+ }
+ String formattedMessage = String
+ .format("Error executing sql script for load filter %s on table %s. The error was: %s",
+ new Object[]{currentFilter != null ? currentFilter.getLoadFilterId() : "N/A", table.getName(),
+ ex.getMessage()});
+ log.error(formattedMessage);
+ if (currentFilter.isFailOnError()) {
+ throw new SymmetricException(formattedMessage, ex);
+ }
+ }
+
+}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadFilter.java
index 0647c24679..be9940bc41 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadFilter.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadFilter.java
@@ -32,7 +32,7 @@ public class LoadFilter implements Serializable {
static final Logger logger = LoggerFactory.getLogger(LoadFilter.class);
- public enum LoadFilterType { BSH, JAVA };
+ public enum LoadFilterType { BSH, JAVA, SQL };
private String loadFilterId;
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ChannelRouterContext.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ChannelRouterContext.java
index 73dbac8813..71a48bbdb2 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ChannelRouterContext.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ChannelRouterContext.java
@@ -64,6 +64,7 @@ public class ChannelRouterContext extends SimpleRouterContext {
private Data lastDataProcessed;
private List dataEventsToSend = new ArrayList();
private boolean produceCommonBatches = false;
+ private boolean onlyDefaultRoutersAssigned = false;
private long lastLoadId = -1;
private long startDataId;
private long endDataId;
@@ -258,5 +259,13 @@ public void addTransaction(String transactionId) {
this.transactions.add(transactionId);
}
}
+
+ public void setOnlyDefaultRoutersAssigned(boolean onlyDefaultRoutersAssigned) {
+ this.onlyDefaultRoutersAssigned = onlyDefaultRoutersAssigned;
+ }
+
+ public boolean isOnlyDefaultRoutersAssigned() {
+ return onlyDefaultRoutersAssigned;
+ }
}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java
index 64f9be2aaf..566e6377a9 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java
@@ -121,8 +121,7 @@ public void run() {
}
}
- protected void execute() {
-
+ protected void execute() {
long maxPeekAheadSizeInBytes = (long)(Runtime.getRuntime().maxMemory() * percentOfHeapToUse);
ISymmetricDialect symmetricDialect = engine.getSymmetricDialect();
ISqlReadCursor cursor = null;
@@ -365,13 +364,13 @@ protected String qualifyUsingDataGaps(List dataGaps, int numberOfGapsTo
protected String getSql(String sqlName, Channel channel) {
String select = engine.getRouterService().getSql(sqlName);
- if (!channel.isUseOldDataToRoute()) {
+ if (!channel.isUseOldDataToRoute() || context.isOnlyDefaultRoutersAssigned()) {
select = select.replace("d.old_data", "''");
}
- if (!channel.isUseRowDataToRoute()) {
+ if (!channel.isUseRowDataToRoute() || context.isOnlyDefaultRoutersAssigned()) {
select = select.replace("d.row_data", "''");
}
- if (!channel.isUsePkDataToRoute()) {
+ if (!channel.isUsePkDataToRoute() || context.isOnlyDefaultRoutersAssigned()) {
select = select.replace("d.pk_data", "''");
}
return engine.getSymmetricDialect().massageDataExtractionSql(
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractOfflineDetectorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractOfflineDetectorService.java
index 54b570c655..aeeabf1c6d 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractOfflineDetectorService.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractOfflineDetectorService.java
@@ -38,6 +38,7 @@
import org.jumpmind.symmetric.service.RegistrationRequiredException;
import org.jumpmind.symmetric.transport.AuthenticationException;
import org.jumpmind.symmetric.transport.ConnectionRejectedException;
+import org.jumpmind.symmetric.transport.ServiceUnavailableException;
import org.jumpmind.symmetric.transport.SyncDisabledException;
/**
@@ -63,6 +64,9 @@ protected void fireOffline(Exception error, Node remoteNode, RemoteNodeStatus st
if (isOffline(error)) {
log.warn("Could not communicate with {} at {} because: {}", new Object[] {remoteNode, syncUrl, cause.getMessage()});
status.setStatus(Status.OFFLINE);
+ } else if (isServiceUnavailable(error)) {
+ log.info("{} at {} was unavailable", new Object[] {remoteNode, syncUrl});
+ status.setStatus(Status.OFFLINE);
} else if (isBusy(error)) {
log.info("{} at {} was busy", new Object[] {remoteNode, syncUrl});
status.setStatus(Status.BUSY);
@@ -142,6 +146,16 @@ protected boolean isBusy(Exception ex) {
return offline;
}
+ protected boolean isServiceUnavailable(Exception ex){
+ boolean offline = false;
+ if (ex != null) {
+ Throwable cause = ExceptionUtils.getRootCause(ex);
+ offline = ex instanceof ServiceUnavailableException ||
+ cause instanceof ServiceUnavailableException;
+ }
+ return offline;
+ }
+
protected boolean isSyncDisabled(Exception ex) {
boolean syncDisabled = false;
if (ex != null) {
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java
index 11ac452a89..5f536dd70d 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java
@@ -115,6 +115,7 @@
import org.jumpmind.symmetric.transport.ConnectionRejectedException;
import org.jumpmind.symmetric.transport.IIncomingTransport;
import org.jumpmind.symmetric.transport.ITransportManager;
+import org.jumpmind.symmetric.transport.ServiceUnavailableException;
import org.jumpmind.symmetric.transport.SyncDisabledException;
import org.jumpmind.symmetric.transport.TransportException;
import org.jumpmind.symmetric.transport.http.HttpTransportManager;
@@ -386,7 +387,7 @@ private void logDataReceivedFromPush(Node sourceNode, List batchL
public List loadDataFromOfflineTransport(Node remote, RemoteNodeStatus status, IIncomingTransport transport) throws IOException {
Node local = nodeService.findIdentity();
ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(remote
- .getNodeId(), local.getNodeId(), ProcessType.PULL_JOB));
+ .getNodeId(), local.getNodeId(), ProcessType.OFFLINE_PULL));
List list = null;
try {
list = loadDataFromTransport(processInfo, remote, transport);
@@ -495,6 +496,8 @@ protected void logAndRethrow(Node remoteNode, Throwable ex) throws IOException {
log.warn("Registration attempt failed. Registration was not open");
} else if (ex instanceof ConnectionRejectedException) {
throw (ConnectionRejectedException) ex;
+ } else if (ex instanceof ServiceUnavailableException) {
+ throw (ServiceUnavailableException) ex;
} else if (ex instanceof AuthenticationException) {
log.warn("Could not authenticate with node '{}'",
remoteNode != null ? remoteNode.getNodeId() : "?");
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java
index 8feae5541d..6f631ec6bf 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java
@@ -45,10 +45,7 @@
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.IPullService;
import org.jumpmind.symmetric.service.IRegistrationService;
-import org.jumpmind.symmetric.transport.AuthenticationException;
-import org.jumpmind.symmetric.transport.ConnectionRejectedException;
-import org.jumpmind.symmetric.transport.SyncDisabledException;
-import org.jumpmind.symmetric.transport.TransportException;
+import org.jumpmind.symmetric.transport.OfflineException;
/**
* @see IPullService
@@ -121,7 +118,6 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
Node node = nodeCommunication.getNode();
if (StringUtils.isNotBlank(node.getSyncUrl()) ||
!parameterService.isRegistrationServer()) {
- try {
int pullCount = 0;
long batchesProcessedCount = 0;
do {
@@ -131,8 +127,26 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
if (pullCount > 1) {
log.info("Immediate pull requested while in reload mode");
}
-
+
+ try {
dataLoaderService.loadDataFromPull(node, status);
+ } catch (ConnectException ex) {
+ log.warn(
+ "Failed to connect to the transport: {}",
+ (node.getSyncUrl() == null ? parameterService.getRegistrationUrl() : node
+ .getSyncUrl()));
+ fireOffline(ex, node, status);
+ } catch (OfflineException ex) {
+ fireOffline(ex, node, status);
+ } catch (UnknownHostException ex) {
+ fireOffline(ex, node, status);
+ } catch (SocketException ex) {
+ log.warn("{}", ex.getMessage());
+ fireOffline(ex, node, status);
+ } catch (IOException ex) {
+ log.error("An IO exception happened while attempting to pull data", ex);
+ fireOffline(ex, node, status);
+ }
if (!status.failed() &&
(status.getDataProcessed() > 0 || status.getBatchesProcessed() > 0)) {
@@ -154,30 +168,7 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
*/
} while (nodeService.isDataLoadStarted() && !status.failed()
&& status.getBatchesProcessed() > batchesProcessedCount);
- } catch (ConnectException ex) {
- log.warn(
- "Failed to connect to the transport: {}",
- (node.getSyncUrl() == null ? parameterService.getRegistrationUrl() : node
- .getSyncUrl()));
- fireOffline(ex, node, status);
- } catch (ConnectionRejectedException ex) {
- fireOffline(ex, node, status);
- } catch (AuthenticationException ex) {
- fireOffline(ex, node, status);
- } catch (UnknownHostException ex) {
- fireOffline(ex, node, status);
- } catch (SyncDisabledException ex) {
- fireOffline(ex, node, status);
- } catch (SocketException ex) {
- log.warn("{}", ex.getMessage());
- fireOffline(ex, node, status);
- } catch (TransportException ex) {
- log.warn("{}", ex.getMessage());
- fireOffline(ex, node, status);
- } catch (IOException ex) {
- log.error("An IO exception happened while attempting to pull data", ex);
- fireOffline(ex, node, status);
- }
+
} else {
log.warn("Cannot pull node '{}' in the group '{}'. The sync url is blank",
node.getNodeId(), node.getNodeGroupId());
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java
index c28693709a..5a14b957d8 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java
@@ -90,7 +90,10 @@
*/
public class RouterService extends AbstractService implements IRouterService {
- protected Map defaultRouterOnlyLastState = new HashMap();
+ protected Map commonBatchesLastKnownState = new HashMap();
+
+
+ protected Map defaultRouterOnlyLastKnownState = new HashMap();
protected transient ExecutorService readThread = null;
@@ -321,15 +324,13 @@ protected int routeDataForEachChannel(DataGapDetector gapDetector) {
try {
final List channels = engine.getConfigurationService().getNodeChannels(
false);
-
for (NodeChannel nodeChannel : channels) {
if (nodeChannel.isEnabled()) {
processInfo.setCurrentChannelId(nodeChannel.getChannelId());
dataCount += routeDataForChannel(processInfo,
nodeChannel,
- sourceNode,
- producesCommonBatches(nodeChannel.getChannel(), parameterService.getNodeGroupId(),
- engine.getTriggerRouterService().getTriggerRouters(false)), gapDetector);
+ sourceNode
+ , gapDetector);
} else {
if (log.isDebugEnabled()) {
log.debug(
@@ -413,26 +414,59 @@ protected boolean producesCommonBatches(Channel channel, String nodeGroupId, Lis
}
}
- if (!producesCommonBatches.equals(defaultRouterOnlyLastState.get(channelId))) {
+ if (!producesCommonBatches.equals(commonBatchesLastKnownState.get(channelId))) {
if (producesCommonBatches) {
log.info("The '{}' channel is in common batch mode", channelId);
} else {
log.info("The '{}' channel is NOT in common batch mode", channelId);
}
- defaultRouterOnlyLastState.put(channelId, producesCommonBatches);
+ commonBatchesLastKnownState.put(channelId, producesCommonBatches);
}
return producesCommonBatches;
}
+
+ protected boolean onlyDefaultRoutersAssigned(Channel channel, String nodeGroupId, List triggerRouters) {
+ String channelId = channel.getChannelId();
+ Boolean onlyDefaultRoutersAssigned = !Constants.CHANNEL_CONFIG.equals(channelId)
+ && !channel.isFileSyncFlag()
+ && !channel.isReloadFlag()
+ && !Constants.CHANNEL_HEARTBEAT.equals(channelId) ? true : false;
+ if (onlyDefaultRoutersAssigned && triggerRouters != null) {
+ for (TriggerRouter triggerRouter : triggerRouters) {
+ if (triggerRouter.getTrigger().getChannelId().equals(channel.getChannelId()) &&
+ triggerRouter.getRouter().getNodeGroupLink().getSourceNodeGroupId()
+ .equals(nodeGroupId) && !"default".equals(triggerRouter.getRouter().getRouterType())) {
+ onlyDefaultRoutersAssigned = false;
+ }
+ }
+ }
+
+ if (!onlyDefaultRoutersAssigned.equals(defaultRouterOnlyLastKnownState.get(channelId))) {
+ if (onlyDefaultRoutersAssigned) {
+ log.info("The '{}' channel for the '{}' node group has only default routers assigned to it. Change data won't be selected during routing", channelId, nodeGroupId);
+ }
+ defaultRouterOnlyLastKnownState.put(channelId, onlyDefaultRoutersAssigned);
+ }
+ return onlyDefaultRoutersAssigned;
+ }
protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nodeChannel, final Node sourceNode,
- boolean produceCommonBatches, DataGapDetector gapDetector) {
+ DataGapDetector gapDetector) {
ChannelRouterContext context = null;
long ts = System.currentTimeMillis();
int dataCount = -1;
try {
+ List triggerRouters = engine.getTriggerRouterService().getTriggerRouters(false);
+ boolean producesCommonBatches = producesCommonBatches(nodeChannel.getChannel(), parameterService.getNodeGroupId(),
+ triggerRouters);
+ boolean onlyDefaultRoutersAssigned = onlyDefaultRoutersAssigned(nodeChannel.getChannel(),
+ parameterService.getNodeGroupId(), triggerRouters);
+
context = new ChannelRouterContext(sourceNode.getNodeId(), nodeChannel,
symmetricDialect.getPlatform().getSqlTemplate().startSqlTransaction());
- context.setProduceCommonBatches(produceCommonBatches);
+ context.setProduceCommonBatches(producesCommonBatches);
+ context.setOnlyDefaultRoutersAssigned(onlyDefaultRoutersAssigned);
+
dataCount = selectDataAndRoute(processInfo, context);
return dataCount;
} catch (DelayRoutingException ex) {
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformService.java
index 24d4f9350a..83f8d24a6d 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformService.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformService.java
@@ -42,7 +42,7 @@
import org.jumpmind.symmetric.io.data.transform.ConstantColumnTransform;
import org.jumpmind.symmetric.io.data.transform.CopyColumnTransform;
import org.jumpmind.symmetric.io.data.transform.CopyIfChangedColumnTransform;
-import org.jumpmind.symmetric.io.data.transform.DeleteAction;
+import org.jumpmind.symmetric.io.data.transform.TargetDmlAction;
import org.jumpmind.symmetric.io.data.transform.IColumnTransform;
import org.jumpmind.symmetric.io.data.transform.IdentityColumnTransform;
import org.jumpmind.symmetric.io.data.transform.JavaColumnTransform;
@@ -284,7 +284,7 @@ public void saveTransformTable(TransformTableNodeGroupLink transformTable, boole
.getTargetCatalogName(), transformTable.getTargetSchemaName(), transformTable
.getTargetTableName(), transformTable.getTransformPoint().toString(),
transformTable.isUpdateFirst() ? 1 : 0, transformTable.getDeleteAction()
- .toString(), transformTable.getTransformOrder(), transformTable
+ .toString(), transformTable.getUpdateAction(), transformTable.getTransformOrder(), transformTable
.getColumnPolicy().toString(), transformTable.getLastUpdateTime(),
transformTable.getLastUpdateBy(), transformTable.getTransformId()) == 0) {
transformTable.setCreateTime(new Date());
@@ -296,7 +296,7 @@ public void saveTransformTable(TransformTableNodeGroupLink transformTable, boole
transformTable.getTargetSchemaName(), transformTable.getTargetTableName(),
transformTable.getTransformPoint().toString(), transformTable
.isUpdateFirst() ? 1 : 0, transformTable.getDeleteAction()
- .toString(), transformTable.getTransformOrder(), transformTable
+ .toString(), transformTable.getUpdateAction(), transformTable.getTransformOrder(), transformTable
.getColumnPolicy().toString(), transformTable.getLastUpdateTime(),
transformTable.getLastUpdateBy(), transformTable.getCreateTime(),
transformTable.getTransformId());
@@ -381,8 +381,8 @@ class TransformTableMapper implements ISqlRowMapper
public TransformTableNodeGroupLink mapRow(Row rs) {
TransformTableNodeGroupLink table = new TransformTableNodeGroupLink();
table.setTransformId(rs.getString("transform_id"));
- table.setNodeGroupLink(configurationService.getNodeGroupLinkFor(
- rs.getString("source_node_group_id"), rs.getString("target_node_group_id"), false));
+ table.setNodeGroupLink(configurationService
+ .getNodeGroupLinkFor(rs.getString("source_node_group_id"), rs.getString("target_node_group_id"), false));
table.setSourceCatalogName(rs.getString("source_catalog_name"));
table.setSourceSchemaName(rs.getString("source_schema_name"));
table.setSourceTableName(rs.getString("source_table_name"));
@@ -401,7 +401,8 @@ public TransformTableNodeGroupLink mapRow(Row rs) {
table.setTransformOrder(rs.getInt("transform_order"));
table.setUpdateFirst(rs.getBoolean("update_first"));
table.setColumnPolicy(ColumnPolicy.valueOf(rs.getString("column_policy")));
- table.setDeleteAction(DeleteAction.valueOf(rs.getString("delete_action")));
+ table.setUpdateAction(rs.getString("update_action"));
+ table.setDeleteAction(TargetDmlAction.valueOf(rs.getString("delete_action")));
table.setCreateTime(rs.getDateTime("create_time"));
table.setLastUpdateBy(rs.getString("last_update_by"));
table.setLastUpdateTime(rs.getDateTime("last_update_time"));
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformServiceSqlMap.java
index 47ae43ed35..4534423b7e 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformServiceSqlMap.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformServiceSqlMap.java
@@ -38,7 +38,7 @@ public TransformServiceSqlMap(IDatabasePlatform platform, Map re
" target_table_name, " +
" transform_point, " +
" transform_order, " +
-" update_first, delete_action, column_policy, " +
+" update_first, update_action, delete_action, column_policy, " +
" last_update_time, last_update_by, create_time " +
" from " +
" $(transform_table) order by transform_order " +
@@ -81,8 +81,9 @@ public TransformServiceSqlMap(IDatabasePlatform platform, Map re
" target_table_name=?, " +
" transform_point=?, " +
" update_first=?, " +
-" delete_action=?, " +
-" transform_order=?, " +
+" delete_action=?, " +
+" update_action=?, " +
+" transform_order=?, " +
" column_policy=?, " +
" last_update_time=?, " +
" last_update_by=? " +
@@ -111,9 +112,9 @@ public TransformServiceSqlMap(IDatabasePlatform platform, Map re
" (source_node_group_id, target_node_group_id, source_catalog_name, " +
" source_schema_name, source_table_name, " +
" target_catalog_name, target_schema_name, target_table_name, " +
-" transform_point, update_first, delete_action, transform_order, " +
+" transform_point, update_first, delete_action, update_action, transform_order, " +
" column_policy, last_update_time, last_update_by, create_time, transform_id) " +
-" values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) " );
+" values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) " );
putSql("insertTransformColumnSql" ,"" +
"insert into $(transform_column) " +
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/AuthenticationException.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/AuthenticationException.java
index c3909761d3..bcbc9f3832 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/AuthenticationException.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/AuthenticationException.java
@@ -20,9 +20,8 @@
*/
package org.jumpmind.symmetric.transport;
-import org.jumpmind.exception.IoException;
-public class AuthenticationException extends IoException {
+public class AuthenticationException extends OfflineException {
private static final long serialVersionUID = -6322765147037755510L;
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/ConnectionRejectedException.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/ConnectionRejectedException.java
index ff86a96bb8..eff18a6118 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/ConnectionRejectedException.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/ConnectionRejectedException.java
@@ -20,9 +20,8 @@
*/
package org.jumpmind.symmetric.transport;
-import org.jumpmind.exception.IoException;
-public class ConnectionRejectedException extends IoException {
+public class ConnectionRejectedException extends OfflineException {
private static final long serialVersionUID = 3770259092569043530L;
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/OfflineException.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/OfflineException.java
new file mode 100644
index 0000000000..fd2d3f5921
--- /dev/null
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/OfflineException.java
@@ -0,0 +1,21 @@
+package org.jumpmind.symmetric.transport;
+
+import org.jumpmind.exception.IoException;
+
+public class OfflineException extends IoException {
+
+ private static final long serialVersionUID = 1L;
+
+ public OfflineException() {
+ super();
+ }
+
+ public OfflineException(Exception e) {
+ super(e);
+ }
+
+ public OfflineException(String msg, Object... args) {
+ super(msg, args);
+ }
+
+}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/ServiceUnavailableException.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/ServiceUnavailableException.java
new file mode 100644
index 0000000000..a80135e5e7
--- /dev/null
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/ServiceUnavailableException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to JumpMind Inc under one or more contributor
+ * license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding
+ * copyright ownership. JumpMind Inc licenses this file
+ * to you under the GNU General Public License, version 3.0 (GPLv3)
+ * (the "License"); you may not use this file except in compliance
+ * with the License.
+ *
+ * You should have received a copy of the GNU General Public License,
+ * version 3.0 (GPLv3) along with this library; if not, see
+ * .
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jumpmind.symmetric.transport;
+
+
+public class ServiceUnavailableException extends OfflineException {
+
+ private static final long serialVersionUID = 3770259092569043530L;
+
+}
\ No newline at end of file
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/SyncDisabledException.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/SyncDisabledException.java
index d2d9e832c2..7d2c6d6fd9 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/SyncDisabledException.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/SyncDisabledException.java
@@ -20,9 +20,8 @@
*/
package org.jumpmind.symmetric.transport;
-import org.jumpmind.exception.IoException;
-public class SyncDisabledException extends IoException {
+public class SyncDisabledException extends OfflineException {
private static final long serialVersionUID = 8646739179107142193L;
}
\ No newline at end of file
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/TransportException.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/TransportException.java
index 642c700b9a..49ea04de5b 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/TransportException.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/TransportException.java
@@ -22,7 +22,7 @@
import java.io.IOException;
-public class TransportException extends RuntimeException {
+public class TransportException extends OfflineException {
private static final long serialVersionUID = -6127189404858972114L;
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpIncomingTransport.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpIncomingTransport.java
index d18a5f5e78..bb4786a775 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpIncomingTransport.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpIncomingTransport.java
@@ -26,7 +26,7 @@
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
-
+
import org.apache.commons.io.IOUtils;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.service.IParameterService;
@@ -35,6 +35,7 @@
import org.jumpmind.symmetric.transport.AuthenticationException;
import org.jumpmind.symmetric.transport.ConnectionRejectedException;
import org.jumpmind.symmetric.transport.IIncomingTransport;
+import org.jumpmind.symmetric.transport.ServiceUnavailableException;
import org.jumpmind.symmetric.transport.SyncDisabledException;
import org.jumpmind.symmetric.transport.TransportUtils;
import org.jumpmind.symmetric.web.WebConstants;
@@ -96,9 +97,11 @@ public InputStream openStream() throws IOException {
case WebConstants.REGISTRATION_REQUIRED:
throw new RegistrationRequiredException();
case WebConstants.SYNC_DISABLED:
- throw new SyncDisabledException();
- case WebConstants.SC_SERVICE_UNAVAILABLE:
+ throw new SyncDisabledException();
+ case WebConstants.SC_SERVICE_BUSY:
throw new ConnectionRejectedException();
+ case WebConstants.SC_SERVICE_UNAVAILABLE:
+ throw new ServiceUnavailableException();
case WebConstants.SC_FORBIDDEN:
throw new AuthenticationException();
default:
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpOutgoingTransport.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpOutgoingTransport.java
index 3969938278..36c96faae8 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpOutgoingTransport.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpOutgoingTransport.java
@@ -40,6 +40,7 @@
import org.jumpmind.symmetric.transport.AuthenticationException;
import org.jumpmind.symmetric.transport.ConnectionRejectedException;
import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport;
+import org.jumpmind.symmetric.transport.ServiceUnavailableException;
import org.jumpmind.symmetric.transport.SyncDisabledException;
import org.jumpmind.symmetric.web.WebConstants;
@@ -252,8 +253,10 @@ public BufferedWriter openWriter() {
* @throws {@link AuthenticationException}
*/
private void analyzeResponseCode(int code) throws IOException {
- if (WebConstants.SC_SERVICE_UNAVAILABLE == code) {
+ if (WebConstants.SC_SERVICE_BUSY == code) {
throw new ConnectionRejectedException();
+ } else if (WebConstants.SC_SERVICE_UNAVAILABLE == code) {
+ throw new ServiceUnavailableException();
} else if (WebConstants.SC_FORBIDDEN == code) {
throw new AuthenticationException();
} else if (WebConstants.SYNC_DISABLED == code) {
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/web/WebConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/web/WebConstants.java
index 08327250b1..4135a58e7d 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/web/WebConstants.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/web/WebConstants.java
@@ -53,9 +53,11 @@ public class WebConstants {
public static final int SYNC_DISABLED = 658;
- public static final int SC_FORBIDDEN = 403;
+ public static final int SC_FORBIDDEN = 659;
- public static final int SC_SERVICE_UNAVAILABLE = 503;
+ public static final int SC_SERVICE_UNAVAILABLE = 660;
+
+ public static final int SC_SERVICE_BUSY = 670;
public static final String ACK_BATCH_NAME = "batch-";
diff --git a/symmetric-core/src/main/resources/symmetric-default.properties b/symmetric-core/src/main/resources/symmetric-default.properties
index 78d8e8e566..7a3fc25ee9 100644
--- a/symmetric-core/src/main/resources/symmetric-default.properties
+++ b/symmetric-core/src/main/resources/symmetric-default.properties
@@ -282,7 +282,7 @@ http.compression=true
# DatabaseOverridable: true
# Tags: transport
# Type: boolean
-http.push.stream.output.enabled=false
+http.push.stream.output.enabled=true
# When HTTP chunking is turned on, this is the size to use for each chunk.
#
@@ -601,13 +601,6 @@ cluster.server.id=
# Tags: jobs
cluster.lock.timeout.ms=1800000
-# Enables clustering of jobs.
-#
-# DatabaseOverridable: true
-# Tags: jobs
-# Type: boolean
-cluster.lock.enabled=false
-
# The amount of time a thread can hold a shared or exclusive lock before another thread can break the lock.
# The timeout is a safeguard in case an unexpected exception causes a lock to be abandoned.
# Restarting the service will clear all locks.
@@ -814,7 +807,7 @@ pull.lock.timeout.ms=7200000
#
# DatabaseOverridable: true
# Tags: jobs
-push.thread.per.server.count=1
+push.thread.per.server.count=100
# The amount of time a single push worker node_communication lock will timeout after.
#
diff --git a/symmetric-core/src/main/resources/symmetric-schema.xml b/symmetric-core/src/main/resources/symmetric-schema.xml
index c7b14a4f27..60653d3b0e 100644
--- a/symmetric-core/src/main/resources/symmetric-schema.xml
+++ b/symmetric-core/src/main/resources/symmetric-schema.xml
@@ -280,7 +280,7 @@
-
+
@@ -656,6 +656,7 @@
+
diff --git a/symmetric-db/src/main/java/org/jumpmind/db/platform/interbase/InterbaseDdlBuilder.java b/symmetric-db/src/main/java/org/jumpmind/db/platform/interbase/InterbaseDdlBuilder.java
index 8dcdddf553..abf05aca8d 100644
--- a/symmetric-db/src/main/java/org/jumpmind/db/platform/interbase/InterbaseDdlBuilder.java
+++ b/symmetric-db/src/main/java/org/jumpmind/db/platform/interbase/InterbaseDdlBuilder.java
@@ -90,8 +90,8 @@ public InterbaseDdlBuilder() {
databaseInfo.setHasSize(Types.BINARY, false);
databaseInfo.setHasSize(Types.VARBINARY, false);
- databaseInfo.setNonBlankCharColumnSpacePadded(true);
- databaseInfo.setBlankCharColumnSpacePadded(true);
+ databaseInfo.setNonBlankCharColumnSpacePadded(false);
+ databaseInfo.setBlankCharColumnSpacePadded(false);
databaseInfo.setCharColumnSpaceTrimmed(false);
databaseInfo.setEmptyStringNulled(false);
diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/DeleteAction.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/TargetDmlAction.java
similarity index 92%
rename from symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/DeleteAction.java
rename to symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/TargetDmlAction.java
index 3e1813cc5d..9c940cf348 100644
--- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/DeleteAction.java
+++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/TargetDmlAction.java
@@ -20,8 +20,8 @@
*/
package org.jumpmind.symmetric.io.data.transform;
-public enum DeleteAction {
+public enum TargetDmlAction {
- NONE, UPDATE_COL, DEL_ROW;
+ NONE, UPDATE_COL, INS_ROW, UPD_ROW, DEL_ROW;
}
diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/TransformTable.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/TransformTable.java
index 40eb9f006d..767a69629a 100644
--- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/TransformTable.java
+++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/TransformTable.java
@@ -20,16 +20,27 @@
*/
package org.jumpmind.symmetric.io.data.transform;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
+import java.util.*;
+import bsh.Interpreter;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Table;
+import org.jumpmind.symmetric.io.data.*;
import org.jumpmind.symmetric.io.data.transform.TransformColumn.IncludeOnType;
+import org.jumpmind.util.Context;
+import org.slf4j.*;
public class TransformTable implements Cloneable {
+ final String INTERPRETER_KEY = String.format("%s.BshInterpreter", getClass().getName());
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ /*
+ * Static context object used to maintain objects in memory for reference between BSH transforms.
+ */
+ private static Map bshContext = new HashMap();
+
protected String transformId;
protected String sourceCatalogName;
protected String sourceSchemaName;
@@ -40,7 +51,8 @@ public class TransformTable implements Cloneable {
protected TransformPoint transformPoint;
protected List transformColumns;
protected List primaryKeyColumns;
- protected DeleteAction deleteAction = DeleteAction.DEL_ROW;
+ protected String updateAction = TargetDmlAction.UPDATE_COL.name();
+ protected TargetDmlAction deleteAction = TargetDmlAction.DEL_ROW;
protected ColumnPolicy columnPolicy = ColumnPolicy.IMPLIED;
protected boolean updateFirst = false;
protected int transformOrder = 0;
@@ -197,12 +209,83 @@ public void addTransformColumn(TransformColumn column) {
primaryKeyColumns.add(column);
}
}
+
+ public void setUpdateAction(String updateAction) {
+ this.updateAction = updateAction;
+ }
+
+ public String getUpdateAction() {
+ return updateAction;
+ }
+
+ public TargetDmlAction evaluateTargetDmlAction(DataContext dataContext, TransformedData transformedData) {
+ TargetDmlAction action = null;
+ try {
+ action = TargetDmlAction.valueOf(updateAction);
+ } catch (Exception ex) {
+
+ }
+ if (action == null) {
+ Interpreter interpreter = getInterpreter(dataContext);
+ Map sourceValues = transformedData.getSourceValues();
+
+ try {
+ interpreter.set("sourceDmlType", transformedData.getSourceDmlType());
+ interpreter.set("sourceDmlTypeString", transformedData.getSourceDmlType().toString());
+ interpreter.set("transformedData", transformedData);
+ CsvData csvData = dataContext.getData();
+ if (csvData != null) {
+ interpreter.set("externalData", csvData.getAttribute("externalData"));
+ }
+ else {
+ interpreter.set("externalData", null);
+ }
+ for (String columnName : sourceValues.keySet()) {
+ interpreter.set(columnName.toUpperCase(), sourceValues.get(columnName));
+ interpreter.set(columnName, sourceValues.get(columnName));
+ }
+ if (transformedData.getOldSourceValues() != null) {
+ for (Map.Entry oldColumn : transformedData.getOldSourceValues().entrySet()) {
+ interpreter.set("OLD_" + oldColumn.getKey(), oldColumn.getValue());
+ interpreter.set("OLD_" + oldColumn.getKey().toUpperCase(), oldColumn.getValue());
+ }
+ }
+ String transformExpression = updateAction;
+ String methodName = String.format("transform_%d()", Math.abs(transformExpression.hashCode()));
+ if (dataContext.get(methodName) == null) {
+ //create BSH-Method if not exists in Context
+ interpreter.set("context", dataContext);
+ interpreter.set("bshContext", bshContext);
+ interpreter.eval(String.format("%s {\n%s\n}", methodName, transformExpression));
+ dataContext.put(methodName, Boolean.TRUE);
+ }
+ //call BSH-Method
+ Object result = interpreter.eval(methodName);
+ //evaluate Result of BSH-Script
+ action = TargetDmlAction.valueOf((String) result);
+ }
+ catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+ return action;
+ }
+
+ protected Interpreter getInterpreter(Context context) {
+ Interpreter interpreter = (Interpreter) context.get(INTERPRETER_KEY);
+ if (interpreter == null) {
+ interpreter = new Interpreter();
+ context.put(INTERPRETER_KEY, interpreter);
+ }
+ return interpreter;
+ }
+
- public void setDeleteAction(DeleteAction deleteAction) {
+ public void setDeleteAction(TargetDmlAction deleteAction) {
this.deleteAction = deleteAction;
}
- public DeleteAction getDeleteAction() {
+ public TargetDmlAction getDeleteAction() {
return deleteAction;
}
diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/TransformWriter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/TransformWriter.java
index bfd2b3775f..1d1743e739 100644
--- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/TransformWriter.java
+++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/TransformWriter.java
@@ -35,7 +35,7 @@
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.IDataWriter;
-import org.jumpmind.symmetric.io.data.transform.DeleteAction;
+import org.jumpmind.symmetric.io.data.transform.TargetDmlAction;
import org.jumpmind.symmetric.io.data.transform.IColumnTransform;
import org.jumpmind.symmetric.io.data.transform.IgnoreColumnException;
import org.jumpmind.symmetric.io.data.transform.IgnoreRowException;
@@ -314,27 +314,42 @@ protected boolean perform(DataContext context, TransformedData data,
// perform a transformation if there are columns defined for
// transformation
if (data.getColumnNames().length > 0) {
- if (data.getTargetDmlType() != DataEventType.DELETE) {
- persistData = true;
- } else {
- // handle the delete action
- DeleteAction deleteAction = transformation.getDeleteAction();
- switch (deleteAction) {
+ TargetDmlAction targetAction = null;
+ switch (data.getTargetDmlType()) {
+ case INSERT:
+ case UPDATE:
+ targetAction = transformation.evaluateTargetDmlAction(context, data);
+ break;
+ case DELETE:
+ targetAction = transformation.getDeleteAction();
+ break;
+ default:
+ persistData = true;
+ }
+ if (targetAction != null) {
+ // how to handle the update/delete action on target..
+ switch (targetAction) {
case DEL_ROW:
data.setTargetDmlType(DataEventType.DELETE);
persistData = true;
break;
case UPDATE_COL:
+ case UPD_ROW:
data.setTargetDmlType(DataEventType.UPDATE);
persistData = true;
break;
+ case INS_ROW:
+ data.setTargetDmlType(DataEventType.INSERT);
+ persistData = true;
+ break;
case NONE:
default:
if (log.isDebugEnabled()) {
log.debug(
- "The {} transformation is not configured to delete row. Not sending the delete through.",
- transformation.getTransformId());
+ "The {} transformation is not configured to delete row. Not sending the delete through.",
+ transformation.getTransformId());
}
+ break;
}
}
}
diff --git a/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/writer/TransformWriterTest.java b/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/writer/TransformWriterTest.java
index 1af9c82d5a..8c08faa338 100644
--- a/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/writer/TransformWriterTest.java
+++ b/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/writer/TransformWriterTest.java
@@ -28,28 +28,8 @@
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
import org.jumpmind.symmetric.io.AbstractWriterTest;
-import org.jumpmind.symmetric.io.data.CsvData;
-import org.jumpmind.symmetric.io.data.DataEventType;
-import org.jumpmind.symmetric.io.data.transform.AdditiveColumnTransform;
-import org.jumpmind.symmetric.io.data.transform.BinaryLeftColumnTransform;
-import org.jumpmind.symmetric.io.data.transform.ClarionDateTimeColumnTransform;
-import org.jumpmind.symmetric.io.data.transform.ColumnsToRowsKeyColumnTransform;
-import org.jumpmind.symmetric.io.data.transform.ColumnsToRowsValueColumnTransform;
-import org.jumpmind.symmetric.io.data.transform.ConstantColumnTransform;
-import org.jumpmind.symmetric.io.data.transform.CopyColumnTransform;
-import org.jumpmind.symmetric.io.data.transform.CopyIfChangedColumnTransform;
-import org.jumpmind.symmetric.io.data.transform.IColumnTransform;
-import org.jumpmind.symmetric.io.data.transform.IdentityColumnTransform;
-import org.jumpmind.symmetric.io.data.transform.JavaColumnTransform;
-import org.jumpmind.symmetric.io.data.transform.LeftColumnTransform;
-import org.jumpmind.symmetric.io.data.transform.MathColumnTransform;
-import org.jumpmind.symmetric.io.data.transform.MultiplierColumnTransform;
-import org.jumpmind.symmetric.io.data.transform.RemoveColumnTransform;
-import org.jumpmind.symmetric.io.data.transform.SubstrColumnTransform;
-import org.jumpmind.symmetric.io.data.transform.TransformColumn;
-import org.jumpmind.symmetric.io.data.transform.TransformPoint;
-import org.jumpmind.symmetric.io.data.transform.TransformTable;
-import org.jumpmind.symmetric.io.data.transform.ValueMapColumnTransform;
+import org.jumpmind.symmetric.io.data.*;
+import org.jumpmind.symmetric.io.data.transform.*;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -81,8 +61,8 @@ public void testNoTransform() {
public void testTableNameChange() {
mockWriter.reset();
Table table = new Table("s1", new Column("id"));
- writeData(getTransformWriter(), new TableCsvData(table, new CsvData(DataEventType.INSERT,
- new String[] { "66" }), new CsvData(DataEventType.INSERT, new String[] { "77" })));
+ writeData(getTransformWriter(), new TableCsvData(table, new CsvData(DataEventType.INSERT, new String[]{"66"}),
+ new CsvData(DataEventType.INSERT, new String[]{"77"})));
List datas = mockWriter.writtenDatas.get(table.getFullyQualifiedTableName());
Assert.assertNull(datas);
datas = mockWriter.writtenDatas.get("t1");
@@ -95,8 +75,8 @@ public void testTableNameChange() {
public void testAddColumn() {
mockWriter.reset();
Table table = new Table("s2", new Column("id"));
- writeData(getTransformWriter(), new TableCsvData(table, new CsvData(DataEventType.INSERT,
- new String[] { "2" }), new CsvData(DataEventType.INSERT, new String[] { "1" })));
+ writeData(getTransformWriter(), new TableCsvData(table, new CsvData(DataEventType.INSERT, new String[]{"2"}),
+ new CsvData(DataEventType.INSERT, new String[]{"1"})));
List datas = mockWriter.writtenDatas.get(table.getFullyQualifiedTableName());
Assert.assertNull(datas);
datas = mockWriter.writtenDatas.get("t2");
@@ -108,6 +88,25 @@ public void testAddColumn() {
}
+ @Test
+ public void testUpdateActionBeanShellScript() throws Exception {
+ mockWriter.reset();
+ Table table = new Table("s3", new Column("id"));
+ writeData(getTransformWriter(), new TableCsvData(table,
+ new CsvData(DataEventType.UPDATE, new String[]{"1"}),
+ new CsvData(DataEventType.UPDATE, new String[]{"2"}),
+ new CsvData(DataEventType.UPDATE, new String[]{"3"}),
+ new CsvData(DataEventType.UPDATE, new String[]{"4"}),
+ new CsvData(DataEventType.UPDATE, new String[]{"5"})));
+ List datas = mockWriter.writtenDatas.get("t3");
+ Assert.assertEquals(4, datas.size());
+ Assert.assertEquals(DataEventType.INSERT, datas.get(0).getDataEventType());
+ Assert.assertEquals(DataEventType.DELETE, datas.get(1).getDataEventType());
+ Assert.assertEquals(DataEventType.UPDATE, datas.get(2).getDataEventType());
+ Assert.assertEquals(DataEventType.UPDATE, datas.get(3).getDataEventType());
+ }
+
+
@Test
public void testSimpleTableBeanShellMapping() throws Exception {
}
@@ -125,11 +124,17 @@ public void testIgnoreRowExceptionFromBshMapping() throws Exception {
}
protected TransformWriter getTransformWriter() {
+ TransformTable transformTable3 =
+ new TransformTable("s3", "t3", TransformPoint.LOAD, new TransformColumn("id", "id", true));
+ transformTable3.setUpdateAction("switch (id) { case \"1\": return \"INS_ROW\"; case \"2\": "
+ + "return \"DEL_ROW\"; case \"3\": return \"UPD_ROW\"; case \"4\": return \"NONE\"; case \"5\": "
+ + "return \"UPDATE_COL\"; }");
return new TransformWriter(platform, TransformPoint.LOAD, mockWriter, buildDefaultColumnTransforms(), new TransformTable[] {
- new TransformTable("s1", "t1", TransformPoint.LOAD, new TransformColumn("id", "id",
- true)),
- new TransformTable("s2", "t2", TransformPoint.LOAD, new TransformColumn("id", "id",
- true), new TransformColumn(null, "col2", false, "const", "added")) });
+ new TransformTable("s1", "t1", TransformPoint.LOAD, new TransformColumn("id", "id", true)),
+ new TransformTable("s2", "t2", TransformPoint.LOAD, new TransformColumn("id", "id", true),
+ new TransformColumn(null, "col2", false, "const", "added")),
+ transformTable3
+ });
}
public static Map> buildDefaultColumnTransforms() {
diff --git a/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/oracle/OracleDdlReader.java b/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/oracle/OracleDdlReader.java
index a28495993b..6d0e6b2bd7 100644
--- a/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/oracle/OracleDdlReader.java
+++ b/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/oracle/OracleDdlReader.java
@@ -136,6 +136,8 @@ protected Integer mapUnknownJdbcTypeForColumn(Map values) {
return Types.DOUBLE;
} else if (typeName != null && typeName.startsWith("BFILE")) {
return Types.VARCHAR;
+ } else if (typeName != null && typeName.startsWith("INTERVAL")) {
+ return Types.VARCHAR;
} else {
return super.mapUnknownJdbcTypeForColumn(values);
}
diff --git a/symmetric-jdbc/src/main/java/org/jumpmind/db/sql/JdbcSqlTransaction.java b/symmetric-jdbc/src/main/java/org/jumpmind/db/sql/JdbcSqlTransaction.java
index c896881e46..7341fb88a3 100644
--- a/symmetric-jdbc/src/main/java/org/jumpmind/db/sql/JdbcSqlTransaction.java
+++ b/symmetric-jdbc/src/main/java/org/jumpmind/db/sql/JdbcSqlTransaction.java
@@ -331,7 +331,7 @@ public Integer execute(Connection con) throws SQLException {
});
}
- protected T executeCallback(IConnectionCallback callback) {
+ public T executeCallback(IConnectionCallback callback) {
try {
return callback.execute(this.connection);
} catch (SQLException ex) {
diff --git a/symmetric-parent/pom.xml b/symmetric-parent/pom.xml
index f96cda77c0..08e7f44c7c 100644
--- a/symmetric-parent/pom.xml
+++ b/symmetric-parent/pom.xml
@@ -24,7 +24,7 @@
1.3.176
3.7.15-M1
9.2-1002-jdbc4
- 5.1.30
+ 5.1.30
1.1.7
1.7.7
1.5.3
@@ -52,10 +52,17 @@
-
- releases
- http://repository.codehaus.org
-
+
+ Nexus
+ Maven2 Repository from AntzSystem GmbH
+ http://ferrari.onesty-tech.loc:8888/nexus/content/groups/public
+
+
+ codehaus-mule-repo
+ codehaus-mule-repo
+ https://repository-master.mulesoft.org/nexus/content/groups/public/
+ default
+
java.net
http://download.java.net/maven/glassfish
@@ -356,10 +363,10 @@
**/org/jumpmind/db/platform/db2/Db2DatabasePlatform.java
**/org/jumpmind/db/platform/hsqldb/HsqlDbDatabasePlatform.java
**/org/jumpmind/db/platform/hsqldb/package.html
- **/org/jumpmind/db/platform/hsqldb/HsqlDbDdlReader.java
+ **/org/jumpmind/db/platform/hsqldb/HsqlDbDdlReader.java
-
+
maven-eclipse-plugin
2.9
@@ -389,7 +396,7 @@
1.0
-
+
maven-jar-plugin
2.3.2
@@ -405,13 +412,13 @@
${project.groupId}
- ${project.artifactId}
+ ${project.artifactId}
${maven.build.timestamp}
${project.version}
${BUILD_NUMBER}
${JOB_NAME}
-
+
${basedir}/target
@@ -446,23 +453,23 @@
- ${user.home}/.symmetricds/lib/ojdbc6.jar
+ ${user.home}/.symmetricds/lib/ojdbc6.jar
- ${user.home}/.symmetricds/lib/ojdbc5.jar
+ ${user.home}/.symmetricds/lib/ojdbc5.jar
- ${user.home}/.symmetricds/lib/ojdbc14.jar
+ ${user.home}/.symmetricds/lib/ojdbc14.jar
- ${user.home}/.symmetricds/lib/db2jcc.jar
+ ${user.home}/.symmetricds/lib/db2jcc.jar
- ${user.home}/.symmetricds/lib/ifxjdbc.jar
+ ${user.home}/.symmetricds/lib/ifxjdbc.jar
- ${user.home}/.symmetricds/lib/ifxlang.jar
+ ${user.home}/.symmetricds/lib/ifxlang.jar
- ${user.home}/.symmetricds/lib/interclient.jar
+ ${user.home}/.symmetricds/lib/interclient.jar
- ${user.home}/.symmetricds/lib/sqljdbc4.jar
+ ${user.home}/.symmetricds/lib/sqljdbc4.jar
- ${user.home}/.symmetricds/lib/jconn4.jar
+ ${user.home}/.symmetricds/lib/jconn4.jar
@@ -552,28 +559,28 @@
org.jumpmind.symmetric
symmetric-oracle
- 3.7.0-SNAPSHOT
-
+ 3.7.0-SNAPSHOT
+
org.jumpmind.symmetric
symmetric-postgres
- 3.7.0-SNAPSHOT
-
+ 3.7.0-SNAPSHOT
+
org.jumpmind.symmetric
symmetric-mssql
- 3.7.0-SNAPSHOT
+ 3.7.0-SNAPSHOT
org.jumpmind.symmetric
symmetric-mongo
- 3.7.0-SNAPSHOT
-
+ 3.7.0-SNAPSHOT
+
org.jumpmind.symmetric
symmetric-ftp
- 3.7.0-SNAPSHOT
-
+ 3.7.0-SNAPSHOT
+
org.jumpmind.symmetric
symmetric-core
@@ -605,7 +612,7 @@
javax.mail
mail
1.4.5
-
+
com.google.code.gson
gson
@@ -625,7 +632,7 @@
org.slf4j
jul-to-slf4j
${version.slf4j}
-
+
org.slf4j
slf4j-api
@@ -658,7 +665,7 @@
javax.jms
jms
-
+
@@ -796,7 +803,7 @@
commons-beanutils
commons-beanutils
1.9.2
-
+
commons-dbcp
commons-dbcp
@@ -912,12 +919,12 @@
mysql
mysql-connector-java
${version.mysql}
-
+
org.mongodb
mongo-java-driver
2.12.3
-
+
org.mariadb.jdbc
mariadb-java-client
@@ -932,8 +939,13 @@
org.jumpmind.symmetric.jdbc
ojdbc
11.2.0.3
-
-
+
+
+ com.oracle
+ ojdbc
+ 11.2.0.4
+
+
net.sourceforge.jtds
jtds
1.2.4
diff --git a/symmetric-server/src/main/deploy/conf/sym_service.conf b/symmetric-server/src/main/deploy/conf/sym_service.conf
index cdb66b2060..5f2e99d8e6 100644
--- a/symmetric-server/src/main/deploy/conf/sym_service.conf
+++ b/symmetric-server/src/main/deploy/conf/sym_service.conf
@@ -31,7 +31,7 @@ wrapper.java.additional.12=-Djava.net.preferIPv4Stack=true
wrapper.java.initmemory=256
# Maximum Java Heap Size (in MB)
-wrapper.java.maxmemory=256
+wrapper.java.maxmemory=1024
#********************************************************************
# Wrapper Application Properties
diff --git a/symmetric-server/src/main/java/org/jumpmind/symmetric/SymmetricWebServer.java b/symmetric-server/src/main/java/org/jumpmind/symmetric/SymmetricWebServer.java
index d36e53e908..1aef27b9a2 100644
--- a/symmetric-server/src/main/java/org/jumpmind/symmetric/SymmetricWebServer.java
+++ b/symmetric-server/src/main/java/org/jumpmind/symmetric/SymmetricWebServer.java
@@ -20,29 +20,47 @@
*/
package org.jumpmind.symmetric;
+import static org.apache.commons.lang.StringUtils.isNotBlank;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
+import java.util.Enumeration;
import javax.management.Attribute;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
import mx4j.tools.adaptor.http.HttpAdaptor;
import mx4j.tools.adaptor.http.XSLTProcessor;
import org.apache.commons.lang.StringUtils;
+import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.security.ConstraintMapping;
import org.eclipse.jetty.security.ConstraintSecurityHandler;
import org.eclipse.jetty.security.HashLoginService;
import org.eclipse.jetty.security.authentication.BasicAuthenticator;
import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.SessionManager;
-import org.eclipse.jetty.server.bio.SocketConnector;
-import org.eclipse.jetty.server.nio.SelectChannelConnector;
-import org.eclipse.jetty.server.ssl.SslSocketConnector;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
+import org.eclipse.jetty.server.session.AbstractSession;
+import org.eclipse.jetty.server.session.HashSessionManager;
+import org.eclipse.jetty.server.session.HashedSession;
import org.eclipse.jetty.util.security.Constraint;
import org.eclipse.jetty.util.security.Password;
import org.eclipse.jetty.util.ssl.SslContextFactory;
@@ -74,17 +92,13 @@ public class SymmetricWebServer {
protected static final Logger log = LoggerFactory.getLogger(SymmetricWebServer.class);
- protected static final String DEFAULT_WEBAPP_DIR = System.getProperty(
- SystemConstants.SYSPROP_WEB_DIR, AppUtils.getSymHome() + "/web");
+ protected static final String DEFAULT_WEBAPP_DIR = System.getProperty(SystemConstants.SYSPROP_WEB_DIR, AppUtils.getSymHome() + "/web");
- public static final String DEFAULT_HTTP_PORT = System.getProperty(
- SystemConstants.SYSPROP_DEFAULT_HTTP_PORT, "31415");
+ public static final String DEFAULT_HTTP_PORT = System.getProperty(SystemConstants.SYSPROP_DEFAULT_HTTP_PORT, "31415");
- public static final String DEFAULT_JMX_PORT = System.getProperty(
- SystemConstants.SYSPROP_DEFAULT_JMX_PORT, "31416");
+ public static final String DEFAULT_JMX_PORT = System.getProperty(SystemConstants.SYSPROP_DEFAULT_JMX_PORT, "31416");
- public static final String DEFAULT_HTTPS_PORT = System.getProperty(
- SystemConstants.SYSPROP_DEFAULT_HTTPS_PORT, "31417");
+ public static final String DEFAULT_HTTPS_PORT = System.getProperty(SystemConstants.SYSPROP_DEFAULT_HTTPS_PORT, "31417");
public static final int DEFAULT_MAX_IDLE_TIME = 7200000;
@@ -130,11 +144,11 @@ public enum Mode {
protected boolean noDirectBuffer = false;
protected String webAppDir = DEFAULT_WEBAPP_DIR;
-
+
protected String name = "SymmetricDS";
-
+
protected String httpSslVerifiedServerNames = "all";
-
+
protected boolean allowSelfSignedCerts = true;
public SymmetricWebServer() {
@@ -150,8 +164,7 @@ public SymmetricWebServer(int maxIdleTime, String propertiesUrl) {
this.maxIdleTime = maxIdleTime;
}
- public SymmetricWebServer(String webDirectory, int maxIdleTime, String propertiesUrl,
- boolean join, boolean noNio, boolean noDirectBuffer) {
+ public SymmetricWebServer(String webDirectory, int maxIdleTime, String propertiesUrl, boolean join, boolean noNio, boolean noDirectBuffer) {
this(propertiesUrl, webDirectory);
this.maxIdleTime = maxIdleTime;
this.join = join;
@@ -166,7 +179,7 @@ public SymmetricWebServer(String propertiesUrl, String webappDir) {
}
protected void initFromProperties() {
-
+
try {
Class.forName(AbstractCommandLauncher.class.getName());
} catch (ClassNotFoundException e) {
@@ -185,14 +198,11 @@ protected void initFromProperties() {
Integer.parseInt(System.getProperty(ServerConstants.HTTPS_PORT, "" + httpsPort)));
jmxPort = serverProperties.getInt(ServerConstants.JMX_HTTP_PORT,
Integer.parseInt(System.getProperty(ServerConstants.JMX_HTTP_PORT, "" + jmxPort)));
- host = serverProperties.get(ServerConstants.HOST_BIND_NAME,
- System.getProperty(ServerConstants.HOST_BIND_NAME, host));
+ host = serverProperties.get(ServerConstants.HOST_BIND_NAME, System.getProperty(ServerConstants.HOST_BIND_NAME, host));
httpSslVerifiedServerNames = serverProperties.get(ServerConstants.HTTPS_VERIFIED_SERVERS,
- System.getProperty(ServerConstants.HTTPS_VERIFIED_SERVERS,
- httpSslVerifiedServerNames));
+ System.getProperty(ServerConstants.HTTPS_VERIFIED_SERVERS, httpSslVerifiedServerNames));
allowSelfSignedCerts = serverProperties.is(ServerConstants.HTTPS_ALLOW_SELF_SIGNED_CERTS,
- Boolean.parseBoolean(System.getProperty(
- ServerConstants.HTTPS_ALLOW_SELF_SIGNED_CERTS, "" + allowSelfSignedCerts)));
+ Boolean.parseBoolean(System.getProperty(ServerConstants.HTTPS_ALLOW_SELF_SIGNED_CERTS, "" + allowSelfSignedCerts)));
}
@@ -209,8 +219,7 @@ public SymmetricWebServer start() throws Exception {
} else if (httpsPort > 0 && httpsEnabled) {
return startSecure(httpsPort, jmxPort);
} else {
- throw new IllegalStateException(
- "Either an http or https port needs to be set before starting the server.");
+ throw new IllegalStateException("Either an http or https port needs to be set before starting the server.");
}
}
@@ -233,32 +242,37 @@ public SymmetricWebServer startMixed(int httpPort, int secureHttpPort, int jmxPo
public SymmetricWebServer start(int httpPort, int securePort, int httpJmxPort, Mode mode) throws Exception {
TransportManagerFactory.initHttps(httpSslVerifiedServerNames, allowSelfSignedCerts);
-
+
// indicate to the app that we are in stand alone mode
System.setProperty(SystemConstants.SYSPROP_STANDALONE_WEB, "true");
server = new Server();
- server.setConnectors(getConnectors(httpPort, securePort, mode));
+ server.setConnectors(getConnectors(server, httpPort, securePort, mode));
setupBasicAuthIfNeeded(server);
webapp = new WebAppContext();
webapp.setParentLoaderPriority(true);
+ webapp.setConfigurationDiscovered(true);
webapp.setContextPath(webHome);
webapp.setWar(webAppDir);
- SessionManager sm = webapp.getSessionHandler().getSessionManager();
- sm.setMaxInactiveInterval(10 * 60);
- sm.setSessionCookie(sm.getSessionCookie() + (httpPort > 0 ? httpPort : securePort));
- webapp.getServletContext().getContextHandler().setMaxFormContentSize(Integer.parseInt(System.getProperty("org.eclipse.jetty.server.Request.maxFormContentSize", "800000")));
- webapp.getServletContext().getContextHandler().setMaxFormKeys(Integer.parseInt(System.getProperty("org.eclipse.jetty.server.Request.maxFormKeys", "100000")));
+ webapp.setResourceBase(webAppDir);
+ // webapp.addServlet(DefaultServlet.class, "/*");
+
+ SessionManager sm = new SessionManager();
+ webapp.getSessionHandler().setSessionManager(sm);
+
+ webapp.getServletContext().getContextHandler()
+ .setMaxFormContentSize(Integer.parseInt(System.getProperty("org.eclipse.jetty.server.Request.maxFormContentSize", "800000")));
+ webapp.getServletContext().getContextHandler()
+ .setMaxFormKeys(Integer.parseInt(System.getProperty("org.eclipse.jetty.server.Request.maxFormKeys", "100000")));
if (propertiesFile != null) {
- webapp.getServletContext().getContextHandler().setInitParameter(
- WebConstants.INIT_SINGLE_SERVER_PROPERTIES_FILE, propertiesFile);
- webapp.getServletContext().getContextHandler().setInitParameter(WebConstants.INIT_PARAM_MULTI_SERVER_MODE,
- Boolean.toString(false));
+ webapp.getServletContext().getContextHandler().setInitParameter(WebConstants.INIT_SINGLE_SERVER_PROPERTIES_FILE, propertiesFile);
+ webapp.getServletContext().getContextHandler()
+ .setInitParameter(WebConstants.INIT_PARAM_MULTI_SERVER_MODE, Boolean.toString(false));
} else {
- webapp.getServletContext().getContextHandler().setInitParameter(WebConstants.INIT_PARAM_MULTI_SERVER_MODE,
- Boolean.toString(true));
+ webapp.getServletContext().getContextHandler()
+ .setInitParameter(WebConstants.INIT_PARAM_MULTI_SERVER_MODE, Boolean.toString(true));
}
server.setHandler(webapp);
@@ -282,8 +296,7 @@ protected ServletContext getServletContext() {
public RestService getRestService() {
ServletContext servletContext = getServletContext();
- WebApplicationContext rootContext =
- WebApplicationContextUtils.getWebApplicationContext(servletContext);
+ WebApplicationContext rootContext = WebApplicationContextUtils.getWebApplicationContext(servletContext);
return rootContext.getBean(RestService.class);
}
@@ -291,15 +304,13 @@ public ISymmetricEngine getEngine() {
ISymmetricEngine engine = null;
ServletContext servletContext = getServletContext();
if (servletContext != null) {
- SymmetricEngineHolder engineHolder = ServletUtils
- .getSymmetricEngineHolder(servletContext);
+ SymmetricEngineHolder engineHolder = ServletUtils.getSymmetricEngineHolder(servletContext);
if (engineHolder != null) {
if (engineHolder.getEngines().size() == 1) {
return engineHolder.getEngines().values().iterator().next();
} else {
- throw new IllegalStateException(
- "Could not choose a single engine to return. There are "
- + engineHolder.getEngines().size() + " engines configured.");
+ throw new IllegalStateException("Could not choose a single engine to return. There are "
+ + engineHolder.getEngines().size() + " engines configured.");
}
}
}
@@ -310,8 +321,7 @@ public void waitForEnginesToComeOnline(long maxWaitTimeInMs) throws InterruptedE
long startTime = System.currentTimeMillis();
ServletContext servletContext = getServletContext();
if (servletContext != null) {
- SymmetricEngineHolder engineHolder = ServletUtils
- .getSymmetricEngineHolder(servletContext);
+ SymmetricEngineHolder engineHolder = ServletUtils.getSymmetricEngineHolder(servletContext);
while (engineHolder.areEnginesStarting()) {
AppUtils.sleep(500);
if ((System.currentTimeMillis() - startTime) > maxWaitTimeInMs) {
@@ -348,47 +358,53 @@ protected void setupBasicAuthIfNeeded(Server server) {
}
}
- protected Connector[] getConnectors(int port, int securePort, Mode mode) {
+ protected Connector[] getConnectors(Server server, int port, int securePort, Mode mode) {
ArrayList connectors = new ArrayList();
String keyStoreFile = System.getProperty(SecurityConstants.SYSPROP_KEYSTORE);
- String keyStoreType = System.getProperty(SystemConstants.SYSPROP_KEYSTORE_TYPE, SecurityConstants.KEYSTORE_TYPE);
+ String keyStoreType = System.getProperty(SecurityConstants.SYSPROP_KEYSTORE_TYPE, SecurityConstants.KEYSTORE_TYPE);
+
+ HttpConfiguration httpConfig = new HttpConfiguration();
+ if (mode.equals(Mode.HTTPS) || mode.equals(Mode.MIXED)) {
+ httpConfig.setSecureScheme("https");
+ httpConfig.setSecurePort(securePort);
+ }
+
+ httpConfig.setOutputBufferSize(32768);
if (mode.equals(Mode.HTTP) || mode.equals(Mode.MIXED)) {
- Connector connector = null;
- if (noNio) {
- connector = new SocketConnector();
- } else {
- SelectChannelConnector nioConnector = new SelectChannelConnector();
- nioConnector.setUseDirectBuffers(!noDirectBuffer);
- connector = nioConnector;
- }
- connector.setPort(port);
- connector.setHost(host);
- connector.setMaxIdleTime(maxIdleTime);
- connectors.add(connector);
+ ServerConnector http = new ServerConnector(server, new HttpConnectionFactory(httpConfig));
+ http.setPort(port);
+ http.setHost(host);
+ http.setIdleTimeout(maxIdleTime);
+ connectors.add(http);
log.info(String.format("About to start %s web server on host:port %s:%s", name, host == null ? "default" : host, port));
}
if (mode.equals(Mode.HTTPS) || mode.equals(Mode.MIXED)) {
- ISecurityService securityService = SecurityServiceFactory.create(SecurityServiceType.SERVER, new TypedProperties(System.getProperties()));
+ ISecurityService securityService = SecurityServiceFactory.create(SecurityServiceType.SERVER,
+ new TypedProperties(System.getProperties()));
securityService.installDefaultSslCert(host);
- Connector connector = new SslSocketConnector();
- String keyStorePassword = System
- .getProperty(SecurityConstants.SYSPROP_KEYSTORE_PASSWORD);
- keyStorePassword = (keyStorePassword != null) ? keyStorePassword
- : SecurityConstants.KEYSTORE_PASSWORD;
- SslContextFactory sslConnectorFactory = ((SslSocketConnector) connector).getSslContextFactory();
+ String keyStorePassword = System.getProperty(SecurityConstants.SYSPROP_KEYSTORE_PASSWORD);
+ keyStorePassword = (keyStorePassword != null) ? keyStorePassword : SecurityConstants.KEYSTORE_PASSWORD;
+ SslContextFactory sslConnectorFactory = new SslContextFactory();
sslConnectorFactory.setKeyStorePath(keyStoreFile);
sslConnectorFactory.setKeyManagerPassword(keyStorePassword);
/* Prevent POODLE attack */
sslConnectorFactory.addExcludeProtocols("SSLv3");
- sslConnectorFactory.setCertAlias(System.getProperty(SystemConstants.SYSPROP_KEYSTORE_CERT_ALIAS, SecurityConstants.ALIAS_SYM_PRIVATE_KEY));
+ sslConnectorFactory.setCertAlias(System.getProperty(SecurityConstants.SYSPROP_KEYSTORE_CERT_ALIAS,
+ SecurityConstants.ALIAS_SYM_PRIVATE_KEY));
sslConnectorFactory.setKeyStoreType(keyStoreType);
- ((SslSocketConnector) connector).setMaxIdleTime(maxIdleTime);
- connector.setPort(securePort);
- connector.setHost(host);
- connectors.add(connector);
- log.info(String.format("About to start %s web server on secure host:port %s:%s", name, host == null ? "default" : host, securePort));
+ HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig);
+ httpsConfig.addCustomizer(new SecureRequestCustomizer());
+
+ ServerConnector https = new ServerConnector(server,
+ new SslConnectionFactory(sslConnectorFactory, HttpVersion.HTTP_1_1.asString()), new HttpConnectionFactory(httpsConfig));
+ https.setPort(securePort);
+ https.setIdleTimeout(maxIdleTime);
+ https.setHost(host);
+ connectors.add(https);
+ log.info(String.format("About to start %s web server on secure host:port %s:%s", name, host == null ? "default" : host,
+ securePort));
}
return connectors.toArray(new Connector[connectors.size()]);
}
@@ -399,8 +415,7 @@ protected void registerHttpJmxAdaptor(int jmxPort) throws Exception {
MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName name = getHttpJmxAdaptorName();
mbeanServer.createMBean(HttpAdaptor.class.getName(), name);
- if (!AppUtils.isSystemPropertySet(SystemConstants.SYSPROP_JMX_HTTP_CONSOLE_LOCALHOST_ENABLED,
- true)) {
+ if (!AppUtils.isSystemPropertySet(SystemConstants.SYSPROP_JMX_HTTP_CONSOLE_LOCALHOST_ENABLED, true)) {
mbeanServer.setAttribute(name, new Attribute("Host", "0.0.0.0"));
} else if (StringUtils.isNotBlank(host)) {
mbeanServer.setAttribute(name, new Attribute("Host", host));
@@ -506,7 +521,6 @@ public void setNoNio(boolean noNio) {
this.noNio = noNio;
}
-
public boolean isNoNio() {
return noNio;
}
@@ -559,5 +573,128 @@ public boolean isJmxEnabled() {
return jmxEnabled;
}
+ class SessionManager extends HashSessionManager {
+
+ public SessionManager() {
+ setMaxInactiveInterval(10 * 60);
+ setLazyLoad(true);
+ setDeleteUnrestorableSessions(true);
+ setSessionCookie(getSessionCookie() + (httpPort > 0 ? httpPort
+ : httpsPort));
+ }
+
+ @Override
+ protected AbstractSession newSession(HttpServletRequest request) {
+ return new Session(this, request);
+ }
+
+ @Override
+ protected AbstractSession newSession(long created, long accessed, String clusterId) {
+ return new Session(this, created, accessed, clusterId);
+ }
+
+ @Override
+ protected synchronized HashedSession restoreSession(String idInCuster) {
+ if (isNotBlank(idInCuster)) {
+ return super.restoreSession(idInCuster);
+ } else {
+ return null;
+ }
+ }
+
+ public HashedSession restoreSession(InputStream is, HashedSession session) throws Exception {
+ DataInputStream di = new DataInputStream(is);
+
+ String clusterId = di.readUTF();
+ di.readUTF(); // nodeId
+
+ long created = di.readLong();
+ long accessed = di.readLong();
+ int requests = di.readInt();
+
+ if (session == null)
+ session = (HashedSession) newSession(created, accessed, clusterId);
+ session.setRequests(requests);
+
+ int size = di.readInt();
+
+ restoreSessionAttributes(di, size, session);
+
+ try {
+ int maxIdle = di.readInt();
+ session.setMaxInactiveInterval(maxIdle);
+ } catch (EOFException e) {
+ log.debug("No maxInactiveInterval persisted for session " + clusterId, e);
+ }
+
+ return session;
+ }
+
+ private void restoreSessionAttributes(InputStream is, int size, HashedSession session) throws Exception {
+ if (size > 0) {
+ ObjectInputStream ois = new ObjectInputStream(is);
+ for (int i = 0; i < size; i++) {
+ String key = ois.readUTF();
+ try {
+ Object value = ois.readObject();
+ session.setAttribute(key, value);
+ } catch (Exception ex) {
+ if (ex instanceof ClassCastException || ex instanceof ClassNotFoundException) {
+ log.warn("Could not restore the '" + key
+ + "' session object. Code has probably changed. The error message was: " + ex.getMessage());
+ } else {
+ log.error("Could not restore the '" + key + "' session object.", ex);
+ }
+ }
+ }
+ }
+ }
+
+ }
+
+ class Session extends HashedSession {
+
+ protected Session(HashSessionManager hashSessionManager, HttpServletRequest request) {
+ super(hashSessionManager, request);
+ }
+
+ protected Session(HashSessionManager hashSessionManager, long created, long accessed, String clusterId) {
+ super(hashSessionManager, created, accessed, clusterId);
+ }
+
+ @Override
+ public synchronized void save(OutputStream os) throws IOException {
+ DataOutputStream out = new DataOutputStream(os);
+ out.writeUTF(getClusterId());
+ out.writeUTF(getNodeId());
+ out.writeLong(getCreationTime());
+ out.writeLong(getAccessed());
+ out.writeInt(getRequests());
+
+ Enumeration e = getAttributeNames();
+ int count = 0;
+ while (e.hasMoreElements()) {
+ String key = e.nextElement();
+ Object obj = doGet(key);
+ if (obj instanceof Serializable) {
+ count++;
+ }
+ }
+ out.writeInt(count);
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ e = getAttributeNames();
+ while (e.hasMoreElements()) {
+ String key = e.nextElement();
+ Object obj = doGet(key);
+ if (obj instanceof Serializable) {
+ oos.writeUTF(key);
+ oos.writeObject(obj);
+ }
+ }
+ oos.flush();
+ out.writeInt(getMaxInactiveInterval());
+ }
+
+ }
}
diff --git a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/NodeConcurrencyInterceptor.java b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/NodeConcurrencyInterceptor.java
index a2868dd279..61969a746d 100644
--- a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/NodeConcurrencyInterceptor.java
+++ b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/NodeConcurrencyInterceptor.java
@@ -73,14 +73,14 @@ public boolean before(HttpServletRequest req, HttpServletResponse resp) throws I
if (!concurrentConnectionManager
.reserveConnection(nodeId, poolId, ReservationType.SOFT)) {
statisticManager.incrementNodesRejected(1);
- ServletUtils.sendError(resp, WebConstants.SC_SERVICE_UNAVAILABLE);
+ ServletUtils.sendError(resp, WebConstants.SC_SERVICE_BUSY);
} else {
try {
buildSuspendIgnoreResponseHeaders(nodeId, resp);
} catch (Exception ex) {
concurrentConnectionManager.releaseConnection(nodeId, poolId);
log.error("Error building response headers", ex);
- ServletUtils.sendError(resp, WebConstants.SC_SERVICE_UNAVAILABLE);
+ ServletUtils.sendError(resp, WebConstants.SC_SERVICE_BUSY);
}
}
return false;
@@ -92,13 +92,13 @@ public boolean before(HttpServletRequest req, HttpServletResponse resp) throws I
} catch (Exception ex) {
concurrentConnectionManager.releaseConnection(nodeId, poolId);
log.error("Error building response headers", ex);
- ServletUtils.sendError(resp, WebConstants.SC_SERVICE_UNAVAILABLE);
+ ServletUtils.sendError(resp, WebConstants.SC_SERVICE_BUSY);
return false;
}
} else {
statisticManager.incrementNodesRejected(1);
- ServletUtils.sendError(resp, WebConstants.SC_SERVICE_UNAVAILABLE);
+ ServletUtils.sendError(resp, WebConstants.SC_SERVICE_BUSY);
return false;
}
}
diff --git a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/compression/CompressionResponseStream.java b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/compression/CompressionResponseStream.java
index 18da3236d7..dabddc79a3 100644
--- a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/compression/CompressionResponseStream.java
+++ b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/compression/CompressionResponseStream.java
@@ -23,10 +23,11 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.GZIPOutputStream;
-
+
import javax.servlet.ServletOutputStream;
+import javax.servlet.WriteListener;
import javax.servlet.http.HttpServletResponse;
-
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -166,6 +167,12 @@ public void write(byte b[], int off, int len) throws IOException {
gzipstream.write(b, off, len);
}
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
/**
* Has this response stream been closed?
@@ -174,4 +181,8 @@ public boolean closed() {
return this.closed;
}
+ @Override
+ public void setWriteListener(WriteListener writeListener) {
+ }
+
}
\ No newline at end of file