Skip to content

Commit

Permalink
Merge branch '3.10' of https://github.com/JumpMind/symmetric-ds.git i…
Browse files Browse the repository at this point in the history
…nto 3.10
  • Loading branch information
jumpmind-josh committed Mar 8, 2019
2 parents 27749d0 + 5660dee commit 243f568
Show file tree
Hide file tree
Showing 59 changed files with 878 additions and 522 deletions.
2 changes: 2 additions & 0 deletions README.md
@@ -1,5 +1,7 @@
<a href="https://sourceforge.net/projects/symmetricds/files/latest/download" rel="nofollow"><img alt="Download SymmetricDS" src="https://img.shields.io/sourceforge/dt/symmetricds.svg"></a>

[![Build Status](https://dev.azure.com/jumpmind/symmetricds/_apis/build/status/SymmetricDS?branchName=3.10)](https://dev.azure.com/jumpmind/symmetricds/_build/latest?definitionId=4&branchName=3.10)

# SymmetricDS
SymmetricDS is a database and file synchronization solution that is platform-independent, web-enabled, and database agnostic. SymmetricDS was built to make data replication across two to tens of thousands of databases and file systems fast, easy and resilient. We specialize in near real time, bi-directional data replication across large node networks over the WAN or LAN.

Expand Down
1 change: 1 addition & 0 deletions symmetric-assemble/src/asciidoc/appendix/mysql.ad
Expand Up @@ -28,6 +28,7 @@ On MySQL 5.1, the SymmetricDS user needs the TRIGGER, PROCESS, and CREATE ROUTIN
----
grant trigger on *.* to symmetric;
grant create routine on *.* to symmetric;
grant alter routine on *.* to symmetric;
grant process on *.* to symmetric;
----

Expand Down
4 changes: 1 addition & 3 deletions symmetric-assemble/src/asciidoc/configuration/channels.ad
Expand Up @@ -70,9 +70,7 @@ File Sync Channel:: Indicates whether a channel is available for file synchroniz
Use Old Data To Route:: Indicates if the old data will be included for routing. Routing can then use this data for processing. Defaults to true.
Use Row Data To Route:: Indicates if the current data will be included for routing. Routing can then use this data for processing. Defaults to true.
Use Primary Key (PK) Data to Route:: Indicates if the primary key data will be include for routing. For example maybe a store ID is needed to apply logic on before sending to the appropriate target nodes. Defaults to true.
Tables Contain Big Lobs:: Indicates whether the channel contains tables with LOB columns exceeding the character limit for the database platform.
Some databases have shortcuts that SymmetricDS can take advantage of if it knows that the lob columns in SYM_DATA aren't going to contain large lobs.
The definition of how large a 'big' lob is varies from database to database.
Contains Lob or Wide Row Data:: For Oracle, Tibero, Firebird, and Interbase, this setting can be enabled when change data capture exceeds the character limit. Oracle and Tibero have a character limit of 4000, while Firebird and Interbase have a character limit of 20000 for changes and 1000 for primary key values. Change data capture is first attempted to extract as character data for better performance, then it will automatically fall back to extract as a large object (LOB). Enable this setting when most changes captured on the channel need extracted as LOB or when the extraction is receiving a truncation error.

.Sample Channels
=====
Expand Down
10 changes: 5 additions & 5 deletions symmetric-assemble/src/asciidoc/configuration/table-triggers.ad
Expand Up @@ -71,13 +71,13 @@ Sync Condition Example:: Sync Conditions can access both old values and new valu
Custom Insert Trigger Text:: Specify insert trigger text (SQL) to execute after the SymmetricDS trigger fires. This field is not applicable for H2, HSQLDB 1.x or Apache Derby.
Custom Update Trigger Text:: Specify update trigger text (SQL) to execute after the SymmetricDS trigger fires. This field is not applicable for H2, HSQLDB 1.x or Apache Derby.
Custom Delete Trigger Text:: Specify delete trigger text (SQL) to execute after the SymmetricDS trigger fires. This field is not applicable for H2, HSQLDB 1.x or Apache Derby.
Sync On Incoming:: Whether or not an incoming batch that loads data into this table should cause the triggers to capture data_events. Be careful turning this on, because an update loop is possible.
Stream Lobs:: Specifies to capture an empty placeholder for LOB data types when a row is changed, then query for the LOB value later when the batch is extracted.
It usually performs better to leave this disabled, which means it will capture LOB data types when a row is changed.
Capture Lobs:: Provides a hint that the table may have row data that is wider than the character limit imposed by the database, requiring the trigger to capture changes as a LOB type.
Sync On Incoming:: Whether or not an incoming batch that loads data into this table should cause the triggers to capture changes. Changes are never sent back to the source node, but enabling this setting makes it possible to create a never-ending loop through intermediary nodes, such as A to B to C to A.
Capture Row As LOB:: For Oracle, Tibero, and DB2, row data that exceeds the character limit needs converted into a large object (LOB) for capture. Oracle and Tibero have a character limit of 4000, while DB2 has a character limit of 32767. The trigger will try to automatically capture the row correctly, so only enable this setting if you are getting truncation errors during trigger creation or when changing data.
Stream LOBs:: Captures an empty placeholder for large object (LOB) data types when a row is changed, then queries for the LOB value later when the batch is extracted. If normal capturing of LOBs is not working, enabling this setting may work instead. When very large LOB data is involved, this setting can reduce the overhead of making changes in the database, but it usually results in worse performance of synchronization since it queries each row during extraction.
Stream Row:: Captures only the primary key values when the trigger fires, which can reduce overhead for tables with wide data or many columns. The data will be queried using the PK values when the batch is extracted. This results in worse performance of synchronization, but it can be used when triggers for all columns won't install or when contention from triggers is too high.
Capture Old Data:: Indicates whether this trigger should capture and send the old data, which is the previous state of the row before the change.
Enable this option if you need to access old data in custom trigger text, routing expression, or transform expression. Otherwise, disable this option for better performance.
Stream Row:: Captures only the primary key when the trigger fires, which can reduce overhead for tables with wide data or many columns. The data will be queried using the PK values when the batch is extracted.
Handle Key Updates:: For SQL-Server and Sybase, enable this setting to capture changes to the primary key. The trigger needs to do some additional work to handle changes to the primary key, so this setting is normally disabled.
External Select:: Specify a SQL select statement that returns a single row, single column result.
It will be used in the generated database trigger to populate the EXTERNAL_DATA field on the data table. See
Excluded Column Names:: Specify a comma-delimited list of columns that should not be synchronized from this table.
Expand Down
2 changes: 1 addition & 1 deletion symmetric-assemble/src/asciidoc/manage/monitors.ad
Expand Up @@ -18,7 +18,7 @@ Different monitor types can check the CPU usage, disk usage, memory usage, batch
of data gaps.
Custom monitor types can be created using <<_extensions,Extensions>> that use the IMonitorType interface.
When the value returned from the check meets or exceeds the threshold value, a <<_monitor_event>> is recorded.
The <<_monitor_event>> table is synchronized on the heartbeat channel, which allows a central server to see events from remote nodes,
The <<_monitor_event>> table is synchronized on the "monitor" channel, which allows a central server to see events from remote nodes,
but this behavior can be disabled by setting the `monitor.events.capture.enabled` parameter to false.

To be immediately notified of a monitor event, use <<_notifications,Notifications>> to match on the severity level.
Expand Down
Expand Up @@ -64,7 +64,12 @@ public MsSqlTriggerTemplate(ISymmetricDialect symmetricDialect) {
blobColumnTemplate = "case when $(origTableAlias).\"$(columnName)\" is null then '' else '\"' + replace(replace(" + defaultCatalog + "dbo.$(prefixName)_base64_encode(CONVERT(VARBINARY(max), $(origTableAlias).\"$(columnName)\")),'\\','\\\\'),'\"','\\\"') + '\"' end" ;
binaryColumnTemplate = blobColumnTemplate;
booleanColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' when $(tableAlias).\"$(columnName)\" = 1 then '\"1\"' else '\"0\"' end" ;
dateTimeWithTimeZoneColumnTemplate = " convert(varchar, $(tableAlias).\"$(columnName)\", 127)";
//dateTimeWithTimeZoneColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else ('\"' + convert(varchar, $(tableAlias).\"$(columnName)\", 127) + '\"') end";
dateTimeWithTimeZoneColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else ('\"' + convert(varchar,cast($(tableAlias).\"$(columnName)\" as datetime2), 121) + ' ' + case when datepart(tz, $(tableAlias).\"$(columnName)\") > 0 then '+' else '-' end + RIGHT('0' + cast(abs(datepart(tz, $(tableAlias).\"$(columnName)\") / 60) as varchar), 2) + ':' + RIGHT('0' + cast(datepart(tz, $(tableAlias).\"$(columnName)\") % 60 as varchar), 2) + '\"') end";




triggerConcatCharacter = "+" ;
newTriggerValue = "inserted" ;
oldTriggerValue = "deleted" ;
Expand Down
Expand Up @@ -371,15 +371,25 @@ public String massageForLob(String sql, boolean isContainsBigLob) {

@Override
public boolean isInitialLoadTwoPassLob(Table table) {
return parameterService.is(ParameterConstants.INITIAL_LOAD_EXTRACT_USE_TWO_PASS_LOB)
&& !TableConstants.getTables(parameterService.getTablePrefix()).contains(table.getNameLowerCase())
&& table.containsLobColumns(this.platform);
boolean initialLoadTwoPassLob = false;
if (parameterService.is(ParameterConstants.INITIAL_LOAD_EXTRACT_USE_TWO_PASS_LOB)
&& !TableConstants.getTables(parameterService.getTablePrefix()).contains(table.getNameLowerCase())) {
for (Column column : table.getLobColumns(this.platform)) {
if (!column.getJdbcTypeName().equalsIgnoreCase("LONG")) {
initialLoadTwoPassLob = true;
break;
}
}
}
return initialLoadTwoPassLob;
}

@Override
public String getInitialLoadTwoPassLobLengthSql(Column column, boolean isFirstPass) {
String quote = this.platform.getDdlBuilder().getDatabaseInfo().getDelimiterToken();
if (isFirstPass) {
if (column.getJdbcTypeName().equalsIgnoreCase("LONG")) {
return isFirstPass ? "1=1" : "1=0";
} else if (isFirstPass) {
return "dbms_lob.getlength(t." + quote + column.getName() + quote + ") <= 4000";
}
return "dbms_lob.getlength(t." + quote + column.getName() + quote + ") > 4000";
Expand Down
Expand Up @@ -44,8 +44,10 @@ public OracleTriggerTemplate(ISymmetricDialect symmetricDialect) {
dateColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS','NLS_CALENDAR=''GREGORIAN''')),'\"'))" ;
clobColumnTemplate = "decode(dbms_lob.getlength(to_clob($(tableAlias).\"$(columnName)\")), null, to_clob(''), '\"'||replace(replace($(tableAlias).\"$(columnName)\",'\\','\\\\'),'\"','\\\"')||'\"')" ;
blobColumnTemplate = "decode(dbms_lob.getlength($(tableAlias).\"$(columnName)\"), null, to_clob(''), '\"'||$(prefixName)_blob2clob($(tableAlias).\"$(columnName)\")||'\"')" ;
longColumnTemplate = "$(oracleToClob)'\"\\b\"'";
booleanColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', '\"'||cast($(tableAlias).\"$(columnName)\" as number("+symmetricDialect.getTemplateNumberPrecisionSpec()+"))||'\"')" ;
xmlColumnTemplate = "decode(dbms_lob.getlength(extract($(tableAlias).\"$(columnName)\", '/').getclobval()), null, to_clob(''), '\"'||replace(replace(extract($(tableAlias).\"$(columnName)\", '/').getclobval(),'\\','\\\\'),'\"','\\\"')||'\"')" ;
binaryColumnTemplate = blobColumnTemplate;
triggerConcatCharacter = "||" ;
newTriggerValue = ":new" ;
oldTriggerValue = ":old" ;
Expand Down
Expand Up @@ -205,7 +205,8 @@ protected void bulkWrite(CsvData data) {

protected void flush() {
if (loadedRows > 0) {
this.stagedInputFile.close();
this.stagedInputFile.close();

statistics.get(batch).startTimer(DataWriterStatisticConstants.LOADMILLIS);
String filename;
if (StringUtils.isEmpty(uncPath)) {
Expand Down Expand Up @@ -240,11 +241,12 @@ protected void flush() {
//TODO: clean this up, deal with errors, etc.?
stmt.execute(sql);
stmt.close();

loadedRows = 0;
} catch (SQLException ex) {
throw getPlatform().getSqlTemplate().translate(ex);
} finally {
statistics.get(batch).stopTimer(DataWriterStatisticConstants.LOADMILLIS);
this.stagedInputFile.delete();
}
}
}
Expand Down
Expand Up @@ -258,8 +258,7 @@ public void testDuplicateRow(){
}
}

protected abstract AbstractDatabaseWriter create();

protected abstract AbstractDatabaseWriter create();

@Override
protected void assertTestTableEquals(String testTableId, String[] expectedValues) {
Expand Down
Expand Up @@ -57,11 +57,6 @@ public void setupTest() {
protected boolean shouldTestRun(IDatabasePlatform platform) {
return platform != null && platform instanceof MySqlDatabasePlatform;
}

@Override
public void testDuplicateRow() {
/* mysql already handles duplidates. no need to test the special functionality we added to handle dupes */
}

protected AbstractDatabaseWriter create(){
return new MySqlBulkDatabaseWriter(platform, platform, "sym_", stagingManager, new CommonsDbcpNativeJdbcExtractor(), 10, 1000,true, true);
Expand Down
Expand Up @@ -31,12 +31,12 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.io.DatabaseXmlUtil;
Expand Down Expand Up @@ -139,8 +139,9 @@

abstract public class AbstractSymmetricEngine implements ISymmetricEngine {

private static Map<String, ISymmetricEngine> registeredEnginesByUrl = new HashMap<String, ISymmetricEngine>();
private static Map<String, ISymmetricEngine> registeredEnginesByName = new HashMap<String, ISymmetricEngine>();
private static Map<String, ISymmetricEngine> registeredEnginesByUrl = new ConcurrentHashMap<String, ISymmetricEngine>();

private static Map<String, ISymmetricEngine> registeredEnginesByName = new ConcurrentHashMap<String, ISymmetricEngine>();

protected static final Logger log = LoggerFactory.getLogger(AbstractSymmetricEngine.class);

Expand Down Expand Up @@ -882,8 +883,12 @@ public synchronized void destroy() {
removeMeFromMap(registeredEnginesByUrl);
if (parameterService != null) {
parameterService.setDatabaseHasBeenInitialized(false);
registeredEnginesByName.remove(getEngineName());
registeredEnginesByUrl.remove(getSyncUrl());
if (getEngineName() != null) {
registeredEnginesByName.remove(getEngineName());
}
if (getSyncUrl() != null) {
registeredEnginesByUrl.remove(getSyncUrl());
}
}
stop();
if (jobManager != null) {
Expand Down Expand Up @@ -1227,7 +1232,10 @@ private void removeMeFromMap(Map<String, ISymmetricEngine> map) {
*/
private void registerHandleToEngine() {
String url = getSyncUrl();
ISymmetricEngine alreadyRegister = registeredEnginesByUrl.get(url);
ISymmetricEngine alreadyRegister = null;
if (url != null) {
alreadyRegister = registeredEnginesByUrl.get(url);
}
if (alreadyRegister == null || alreadyRegister.equals(this)) {
if (url != null) {
registeredEnginesByUrl.put(url, this);
Expand All @@ -1237,7 +1245,9 @@ private void registerHandleToEngine() {
getSyncUrl());
}

alreadyRegister = registeredEnginesByName.get(getEngineName());
if (getEngineName() != null) {
alreadyRegister = registeredEnginesByName.get(getEngineName());
}
if (alreadyRegister == null || alreadyRegister.equals(this)) {
registeredEnginesByName.put(getEngineName(), this);
} else {
Expand Down
Expand Up @@ -124,6 +124,7 @@ private ParameterConstants() {
public final static String AUTO_RESOLVE_FOREIGN_KEY_VIOLATION = "auto.resolve.foreign.key.violation";
public final static String AUTO_RESOLVE_FOREIGN_KEY_VIOLATION_REVERSE = "auto.resolve.foreign.key.violation.reverse";
public final static String AUTO_RESOLVE_FOREIGN_KEY_VIOLATION_REVERSE_PEERS = "auto.resolve.foreign.key.violation.reverse.peers";
public final static String AUTO_RESOLVE_FOREIGN_KEY_VIOLATION_REVERSE_RELOAD = "auto.resolve.foreign.key.violation.reverse.reload";
public final static String AUTO_INSERT_REG_SVR_IF_NOT_FOUND = "auto.insert.registration.svr.if.not.found";
public final static String AUTO_SYNC_CONFIGURATION = "auto.sync.configuration";
public final static String AUTO_SYNC_CONFIGURATION_ON_INCOMING = "auto.sync.configuration.on.incoming";
Expand Down
Expand Up @@ -101,6 +101,8 @@ abstract public class AbstractTriggerTemplate {

protected String blobColumnTemplate;

protected String longColumnTemplate;

protected String binaryColumnTemplate;

protected String imageColumnTemplate;
Expand Down Expand Up @@ -874,7 +876,11 @@ else if (column.getJdbcTypeName() != null
break;
case Types.LONGVARCHAR:
case ColumnTypes.LONGNVARCHAR:
if (!isLob) {
if (column.getJdbcTypeName().equalsIgnoreCase("LONG") && isNotBlank(longColumnTemplate)) {
templateToUse = longColumnTemplate;
isLob = false;
break;
} else if (!isLob) {
templateToUse = stringColumnTemplate;
break;
}
Expand Down

0 comments on commit 243f568

Please sign in to comment.