Skip to content

Commit

Permalink
Merge branch '3.7' of https://github.com/JumpMind/symmetric-ds.git in…
Browse files Browse the repository at this point in the history
…to 3.7
  • Loading branch information
erilong committed Oct 1, 2015
2 parents 7daabdd + 0797f05 commit 8f8f088
Show file tree
Hide file tree
Showing 57 changed files with 1,018 additions and 358 deletions.
2 changes: 2 additions & 0 deletions .gitattributes
@@ -0,0 +1,2 @@
*.csv text eol=lf

6 changes: 3 additions & 3 deletions symmetric-assemble/build.gradle
Expand Up @@ -209,8 +209,8 @@ project(':symmetric-server') {
exclude group: 'com.fasterxml.jackson.core'
exclude group: 'commons-codec'
}
provided "javax.servlet:servlet-api:2.5"
provided "org.eclipse.jetty:jetty-server:$jettyVersion"
compile "javax.servlet:javax.servlet-api:$servletVersion"
provided "org.eclipse.jetty:jetty-annotations:$jettyVersion"
provided "org.eclipse.jetty:jetty-servlets:$jettyVersion"
provided "org.eclipse.jetty:jetty-webapp:$jettyVersion"
provided "org.eclipse.jetty:jetty-jmx:$jettyVersion"
Expand Down Expand Up @@ -323,4 +323,4 @@ task releaseSymmetric {
dependsOn publishDoc
}

task wrapper(type: Wrapper) { gradleVersion = '2.2.1' }
task wrapper(type: Wrapper) { gradleVersion = '2.7' }
4 changes: 2 additions & 2 deletions symmetric-assemble/common.gradle
Expand Up @@ -138,13 +138,13 @@ subprojects { subproject ->
mockitoVersion = '1.9.5'
powerMockVersion = '1.5.3'
mysqlVersion = '5.1.30'
servletVersion = '3.0.1'
servletVersion = '3.1.0'
springVersion = '4.0.5.RELEASE'
jtdsVersion = '1.2.8'
bouncyCastleVersion = '140'
animalSnifferVersion = '1.10'
jnaVersion = '4.1.0'
jettyVersion = '7.6.3.v20120416'
jettyVersion = '9.3.3.v20150827'

env = System.getenv()
}
Expand Down
21 changes: 0 additions & 21 deletions symmetric-assemble/pom.xml
Expand Up @@ -102,27 +102,6 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.db.torque</groupId>
<artifactId>torque-maven-plugin</artifactId>
<version>3.3-RC2</version>
<configuration>
<outputFormat>docbook</outputFormat>
<targetDatabase>symmetric</targetDatabase>
<outputDir>${docbook.build}</outputDir>
<schemaDir>${basedir}/../symmetric-core/src/main/resources</schemaDir>
<templatePath>${basedir}/src/torque</templatePath>
<useClasspath>false</useClasspath>
</configuration>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>documentation</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
Expand Down
Expand Up @@ -15,7 +15,7 @@ endif::pro[]

Filter Id:: The unique identifier for the load filter
Group Link:: The group link for with the load filter will be applied.
Type:: The type of load filter. Today only Bean Shell and Java are supported ('BSH', 'Java'), but SQL scripts may be added in a future release.
Type:: The type of load filter. Today only Bean Shell and Java are supported ('BSH', 'Java', 'SQL').
Target Table:: The table on the target which the load filter will execute when changes occur on it.

NOTE: Use the wildcard * to specify all tables configured through the group link. Partial table names in conjunction with a wildcard
Expand Down
Expand Up @@ -29,15 +29,15 @@ Handle Error Script:: A script to execute if data cannot be processed.
.Variables available within scripts
[cols="3,^1,^1,5"]
|===
|Variable|BSH|JAVA|Description

|engine|X||The Symmetric engine object.
|COLUMN_NAME|X||The source values for the row being inserted, updated or deleted.
|OLD_COLUMN_NAME|X||The old values for the row being inserted, updated or deleted.
|context|X|X|The data context object for the data being inserted, updated or deleted. .
|table|X|X|The table object for the table being inserted, updated or deleted.
|data|X|X|The `CsvData` object for the data change.
|error|X|X|`java.lang.Exception`
|Variable|BSH|SQL|JAVA|Description

|engine|X|||The Symmetric engine object.
|COLUMN_NAME|X|X||The source values for the row being inserted, updated or deleted.
|OLD_COLUMN_NAME|X|X||The old values for the row being inserted, updated or deleted.
|context|X||X|The data context object for the data being inserted, updated or deleted. .
|table|X||X|The table object for the table being inserted, updated or deleted.
|data|X||X|The `CsvData` object for the data change.
|error|X||X|`java.lang.Exception`

|===

Expand Down
Expand Up @@ -112,7 +112,7 @@ ifndef::pro[]
----
insert into SYM_TRANSFORM_TABLE (
transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name,
target_table_name, delete_action, transform_order, column_policy, update_first,
target_table_name, update_action, delete_action, transform_order, column_policy, update_first,
last_update_by, last_update_time, create_time
) values (
'itemSellingPriceTransform', 'store', 'corp', 'EXTRACT', 'ITEM_SELLING_PRICE',
Expand Down
Expand Up @@ -26,7 +26,7 @@ ifndef::pro[]
----
insert into SYM_TRANSFORM_TABLE (
transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name,
target_table_name, delete_action, transform_order, column_policy, update_first,
target_table_name, update_action, delete_action, transform_order, column_policy, update_first,
last_update_time, create_time
) values (
'update-first', 'store', 'corp', 'EXTRACT', 'ITEM_SELLING_PRICE',
Expand All @@ -48,7 +48,7 @@ ifndef::pro[]
----
insert into SYM_TRANSFORM_TABLE (
transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name,
target_table_name, delete_action, transform_order, column_policy, update_first,
target_table_name, update_action, delete_action, transform_order, column_policy, update_first,
last_update_time, create_time
) values (
'delete-action-update-col', 'store', 'corp', 'EXTRACT', 'ITEM_SELLING_PRICE',
Expand Down
Expand Up @@ -198,6 +198,8 @@ This transform copies the left most number of bytes specified.
This transformation determines the target column value by using a query, contained in transform expression
to lookup the value in another table. The query must return a single row, and the first column of the query
is used as the value. Your query references source column names by prefixing with a colon (e.g., :MY_COLUMN).
Additional you can reference old values with :OLD_COLUMN and previously transformed columns (see transform order) with
:TRM_COLUMN.

ifndef::pro[]
[source, SQL]
Expand Down
Expand Up @@ -22,7 +22,7 @@ ifndef::pro[]
----
insert into SYM_TRANSFORM_TABLE (
transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name,
target_table_name, delete_action, transform_order, column_policy, update_first,
target_table_name, update_action, delete_action, transform_order, column_policy, update_first,
last_update_by, last_update_time, create_time
) values (
'extractStoreItemSellingPriceTransform', 'store', 'corp', 'EXTRACT', 'ITEM_SELLING_PRICE',
Expand All @@ -35,7 +35,7 @@ insert into SYM_TRANSFORM_TABLE (
----
insert into SYM_TRANSFORM_TABLE (
transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name,
target_table_name, delete_action, transform_order, column_policy, update_first,
target_table_name, update_action, delete_action, transform_order, column_policy, update_first,
last_update_by, last_update_time, create_time
) values (
'loadCorpItemSellingPriceTransform', 'corp', 'store', 'LOAD', 'ITEM_SELLING_PRICE',
Expand Down
21 changes: 21 additions & 0 deletions symmetric-assemble/src/docbook/configuration.xml
Expand Up @@ -1242,6 +1242,27 @@ column.</listitem>
</itemizedlist>
</listitem>

<listitem>
update_action: When a source operation of Update takes place, there are
three possible ways to handle the transformation at the target. The
options include:
<itemizedlist>
<listitem>NONE - The update results in no target changes.</listitem>

<listitem>DEL_ROW - The update results in a delete of the row
as specified by the pk columns defined in the transformation
configuration.</listitem>

<listitem>UPDATE_COL - The update results in an Update
operation on the target which updates the specific rows and columns
based on the defined transformation.</listitem>

<listitem>BeanShell Script Transform ('bsh'):
script code which returns one of the above items.
you can use COLUMN variables inside the script.</listitem>
</itemizedlist>
</listitem>

<listitem>
delete_action: When a source operation of Delete takes place, there are
three possible ways to handle the transformation at the target. The
Expand Down
7 changes: 7 additions & 0 deletions symmetric-client/pom.xml
Expand Up @@ -135,11 +135,18 @@
<artifactId>postgresql</artifactId>
<scope>provided</scope>
</dependency>
<!--
<dependency>
<groupId>org.jumpmind.symmetric.jdbc</groupId>
<artifactId>ojdbc</artifactId>
<scope>provided</scope>
</dependency>
-->
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.sourceforge.jtds</groupId>
<artifactId>jtds</artifactId>
Expand Down
Expand Up @@ -66,7 +66,7 @@
import org.jumpmind.symmetric.db.postgresql.PostgreSqlSymmetricDialect;
import org.jumpmind.symmetric.db.redshift.RedshiftSymmetricDialect;
import org.jumpmind.symmetric.db.sqlanywhere.SqlAnywhereSymmetricDialect;
import org.jumpmind.symmetric.db.sqlite.SqliteSymmetricDialect;
import org.jumpmind.symmetric.db.sqlite.SqliteJdbcSymmetricDialect;
import org.jumpmind.symmetric.service.IParameterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -149,7 +149,7 @@ public ISymmetricDialect create() {
} else if (platform instanceof InterbaseDatabasePlatform) {
dialect = new InterbaseSymmetricDialect(parameterService, platform);
} else if (platform instanceof SqliteDatabasePlatform) {
dialect = new SqliteSymmetricDialect(parameterService, platform);
dialect = new SqliteJdbcSymmetricDialect(parameterService, platform);
} else {
throw new DbNotSupportedException();
}
Expand Down
@@ -0,0 +1,38 @@
package org.jumpmind.symmetric.db.sqlite;

import java.sql.Connection;
import java.sql.SQLException;

import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.IConnectionCallback;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.JdbcSqlTransaction;
import org.jumpmind.symmetric.service.IParameterService;

public class SqliteJdbcSymmetricDialect extends SqliteSymmetricDialect {

public SqliteJdbcSymmetricDialect(IParameterService parameterService, IDatabasePlatform platform) {
super(parameterService, platform);
}

@Override
protected void setSqliteFunctionResult(ISqlTransaction transaction, final String name, final String result){
JdbcSqlTransaction trans = (JdbcSqlTransaction)transaction;
trans.executeCallback(new IConnectionCallback<Object>() {
@Override
public Object execute(Connection con) throws SQLException {
org.sqlite.SQLiteConnection unwrapped = ((org.sqlite.SQLiteConnection)((org.apache.commons.dbcp.DelegatingConnection)con).getInnermostDelegate());

org.sqlite.Function.create(unwrapped, name, new org.sqlite.Function() {
@Override
protected void xFunc() throws SQLException {
this.result(result);
}
});

return null;
}
});
}

}
Expand Up @@ -28,18 +28,17 @@
import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.ext.ISymmetricEngineAware;
import org.jumpmind.symmetric.io.OracleBulkDatabaseWriter;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.Conflict;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.data.writer.ResolvedData;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
import org.jumpmind.symmetric.load.IDataLoaderFactory;
import org.jumpmind.symmetric.load.DefaultDataLoaderFactory;
import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor;

public class OracleBulkDataLoaderFactory implements IDataLoaderFactory, ISymmetricEngineAware,
public class OracleBulkDataLoaderFactory extends DefaultDataLoaderFactory implements ISymmetricEngineAware,
IBuiltInExtensionPoint {

private ISymmetricEngine engine;
Expand All @@ -60,11 +59,12 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri
int maxRowsBeforeFlush = engine.getParameterService().getInt(
"oracle.bulk.load.max.rows.before.flush", 1000);
return new OracleBulkDatabaseWriter(symmetricDialect.getPlatform(), engine.getTablePrefix(),
jdbcExtractor, maxRowsBeforeFlush);
jdbcExtractor, maxRowsBeforeFlush, buildDatabaseWriterSettings(filters, errorHandlers, conflictSettings, resolvedData));
}

public void setSymmetricEngine(ISymmetricEngine engine) {
this.engine = engine;
this.parameterService = engine.getParameterService();
}

public boolean isPlatformSupported(IDatabasePlatform platform) {
Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter;
import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor;

Expand All @@ -58,8 +59,8 @@ public class OracleBulkDatabaseWriter extends DefaultDatabaseWriter {
protected List<List<Object>> rowArrays = new ArrayList<List<Object>>();

public OracleBulkDatabaseWriter(IDatabasePlatform platform, String procedurePrefix,
NativeJdbcExtractor jdbcExtractor, int maxRowsBeforeFlush) {
super(platform);
NativeJdbcExtractor jdbcExtractor, int maxRowsBeforeFlush, DatabaseWriterSettings settings) {
super(platform, settings);
this.procedurePrefix = procedurePrefix;
this.jdbcExtractor = jdbcExtractor;
this.maxRowsBeforeFlush = maxRowsBeforeFlush;
Expand Down Expand Up @@ -89,23 +90,26 @@ public void write(CsvData data) {
case INSERT:
statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER);
Object[] rowData = platform.getObjectValues(batch.getBinaryEncoding(),
getRowData(data, CsvData.ROW_DATA), targetTable.getColumns());
for (int i = 0; i < rowData.length; i++) {

List<Object> columnList = null;
if (rowArrays.size() > i) {
columnList = rowArrays.get(i);
} else {
columnList = new ArrayList<Object>();
rowArrays.add(columnList);
if (filterBefore(data)) {
Object[] rowData = platform.getObjectValues(batch.getBinaryEncoding(), getRowData(data, CsvData.ROW_DATA),
targetTable.getColumns());
for (int i = 0; i < rowData.length; i++) {

List<Object> columnList = null;
if (rowArrays.size() > i) {
columnList = rowArrays.get(i);
} else {
columnList = new ArrayList<Object>();
rowArrays.add(columnList);
}
columnList.add(rowData[i]);

if (columnList.size() >= maxRowsBeforeFlush) {
requiresFlush = true;
}
}
columnList.add(rowData[i]);

if (columnList.size() >= maxRowsBeforeFlush) {
requiresFlush = true;
}
}
uncommittedCount++;
}
break;
case UPDATE:
super.write(data);
Expand All @@ -121,6 +125,8 @@ public void write(CsvData data) {
if (requiresFlush) {
flush();
}

checkForEarlyCommit();
}

protected void flush() {
Expand Down
Expand Up @@ -148,7 +148,7 @@ public void exportTestDatabaseSQL() throws Exception {
String output = export.exportTables(tables).toLowerCase();

Assert.assertEquals(output, 42, StringUtils.countMatches(output, "create table \"sym_"));
final int EXPECTED_VARCHAR_MAX = engine.getDatabasePlatform().getName().equals(DatabaseNamesConstants.SQLITE) ? 264 : 43;
final int EXPECTED_VARCHAR_MAX = engine.getDatabasePlatform().getName().equals(DatabaseNamesConstants.SQLITE) ? 265 : 43;
final String EXPECTED_STRING = "varchar(" + Integer.MAX_VALUE + ")";
Assert.assertEquals("Expected " + EXPECTED_VARCHAR_MAX + " " + EXPECTED_STRING
+ " in the following output: " + output, EXPECTED_VARCHAR_MAX,
Expand Down

0 comments on commit 8f8f088

Please sign in to comment.