Skip to content

Commit

Permalink
Merge branch '3.12' of https://github.com/JumpMind/symmetric-ds.git i…
Browse files Browse the repository at this point in the history
…nto 3.12
  • Loading branch information
vanmetjk committed May 27, 2020
2 parents e6a8dc4 + adef033 commit 5e5e9e2
Show file tree
Hide file tree
Showing 17 changed files with 132 additions and 86 deletions.
4 changes: 2 additions & 2 deletions symmetric-assemble/src/asciidoc/configuration/group-links.ad
Expand Up @@ -43,11 +43,11 @@ ifndef::pro[]
[source,sql]
----
insert into SYM_NODE_GROUP_LINK
(source_node_group, target_node_group, data_event_action)
(source_node_group_id, target_node_group_id, data_event_action)
values ('store', 'corp', 'P');

insert into SYM_NODE_GROUP_LINK
(source_node_group, target_node_group, data_event_action)
(source_node_group_id, target_node_group_id, data_event_action)
values ('corp', 'store', 'W');
----
endif::pro[]
Expand Down
17 changes: 9 additions & 8 deletions symmetric-assemble/src/asciidoc/configuration/table-triggers.ad
Expand Up @@ -55,19 +55,19 @@ ifdef::pro[]
.Advanced Options
endif::pro[]

Sync On Insert:: Determines if changes will be captured for inserts.
Sync On Update:: Determines if changes will be captured for updates.
Sync On Delete:: Determines if changes will be captured for deletes.
Sync On Insert:: Flag for installing an insert trigger.
Sync On Update:: Flag for installing an update trigger.
Sync On Delete:: Flag for installing a delete trigger.
Reload Channel Id:: The channel_id of the channel that will be used for initial loads.
Sync On Insert Condition:: Specify a condition for the insert trigger firing using an expression specific to the database. On most platforms, it is added to an "IF" statement in the trigger text. On SQL-Server it is added to the "WHERE" clause of a query for inserted/deleted logical tables. See Sync Condition Example.
Sync On Update Condition:: Specify a condition for the update trigger firing using an expression specific to the database. On most platforms, it is added to an "IF" statement in the trigger text. On SQL-Server it is added to the "WHERE" clause of a query for inserted/deleted logical tables. See Sync Condition Example.
Sync On Delete Condition:: Specify a condition for the delete trigger firing using an expression specific to the database. On most platforms, it is added to an "IF" statement in the trigger text. On SQL-Server it is added to the "WHERE" clause of a query for inserted/deleted logical tables. See Sync Condition Example.
Sync Condition Example:: Sync Conditions can access both old values and new values of a field/column using "old_" and "new_" respectively. For example, if your column is id and your condition checks the value coming in to be 'test', your condition will be:
Sync Conditions:: A procedure language expression included in the trigger text to determine whether a change is captured or not. Most platforms include the condition inside an "IF" statement, while SQL-Server includes the condition in a "WHERE" clause. Old and new values of a column can be referenced using "$(oldTriggerValue)" and "$(newTriggerValue)" aliases respectively. See <<Trigger Variables>>. For example, if a character column is named "STATUS" and the row should be captured when the value is "2", then the condition would be:
+
----
new_id = 'test'
$(newTriggerValue).status = '2'
----

Sync On Insert Condition:: Conditional expression for the insert trigger to determine if a change is captured or not. See Sync Conditions.
Sync On Update Condition:: Conditional expression for the update trigger to determine if a change is captured or not. See Sync Conditions.
Sync On Delete Condition:: Conditional expression for the delete trigger to determine if a change is captured or not. See Sync Conditions.
Custom Insert Trigger Text:: Specify insert trigger text (SQL) to execute after the SymmetricDS trigger fires. This field is not applicable for H2, HSQLDB 1.x or Apache Derby.
Custom Update Trigger Text:: Specify update trigger text (SQL) to execute after the SymmetricDS trigger fires. This field is not applicable for H2, HSQLDB 1.x or Apache Derby.
Custom Delete Trigger Text:: Specify delete trigger text (SQL) to execute after the SymmetricDS trigger fires. This field is not applicable for H2, HSQLDB 1.x or Apache Derby.
Expand Down Expand Up @@ -129,4 +129,5 @@ This property is currently only supported on MySQL, DB2, SQL Server, and Oracle.

include::table-triggers/wildcards.ad[]
include::table-triggers/external-select.ad[]
include::table-triggers/trigger-variables.ad[]
include::table-triggers/load-only.ad[]
Expand Up @@ -6,7 +6,8 @@ This data is typically needed for the purposes of determining where to 'route' t
definition contains an optional "external select" field which can be used to specify the data to be captured. Once captured, this
data is available during routing in DATA 's external_data field.

For these cases, place a SQL select statement which returns the data item you need for routing in external_select.
For these cases, place a SQL select statement which returns the data item you need for routing in external_select.
See <<Trigger Variables>> for a list of variables available for use.

IMPORTANT: The external select SQL must return a single row, single column

Expand All @@ -32,20 +33,6 @@ values ('orderlineitem', 'orderlineitem','orderlineitem',
====
endif::pro[]

.The following variables can be used with the external select
[cols=".^2,8"]
|===

|$(curTriggerValue)|Variable to be replaced with the NEW or OLD column alias provided by the trigger context, which is platform specific.
For insert and update triggers, the NEW alias is used; for delete triggers, the OLD alias is used. For example, "$(curTriggerValue).COLUMN"
becomes ":new.COLUMN" for an insert trigger on Oracle.

|$(curColumnPrefix)|Variable to be replaced with the NEW_ or OLD_ column prefix for platforms that don't support column aliases. This is
currently only used by the H2 database. All other platforms will replace the variable with an empty string. For example "$(curColumnPrefix)COLUMN"
becomes "NEW_COLUMN" on H2 and "COLUMN" on Oracle.

|===

WARNING: External select SQL statements should be used carefully as they will cause the trigger to run the additional SQL each time the trigger fires.

TIP: Using an external select on the trigger is similar to using the 'subselect' router. The advantage of this approach over the 'subselect' approach
Expand Down
@@ -0,0 +1,21 @@

==== Trigger Variables

The Sync Condition, External Select, and Custom Trigger Text configurations allow the user to provide
procedure language text that is included inside the trigger.
Variables can be used for configuration that works across different database platforms.
When triggers are created, the variables are replaced with the syntax needed for that specific database.

.Trigger Template Variables
[cols=".^2,8"]
|===

|$(newTriggerValue)|New row alias for inserts and updates. For example, "$(newTriggerValue).MYCOLUMN" becomes ":new.MYCOLUMN" for an insert/update trigger on Oracle.

|$(oldTriggerValue)|Old row alias for updates and deletes. For example, "$(oldTriggerValue).MYCOLUMN" becomes ":old.MYCOLUMN" for an update/delete trigger on Oracle.

|$(curTriggerValue)|Current row alias for insert, updates, and deletes. This variable acts like $(newTriggerValue) for inserts and updates, and it acts like $(oldTriggerValue) for deletes.

|$(curColumnPrefix)|Column prefix only used by H2 database. It is replaced with the NEW_ or OLD_ column prefix needed by H2. All other platforms will replace the variable with an empty string

|===
Expand Up @@ -35,19 +35,19 @@ public OracleTriggerTemplate(ISymmetricDialect symmetricDialect) {
// @formatter:off

emptyColumnTemplate = "''" ;
stringColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, $(oracleToClob)'', '\"'||replace(replace($(oracleToClob)$(tableAlias).\"$(columnName)\",'\\','\\\\'),'\"','\\\"')||'\"')" ;
geometryColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then to_clob('') else '\"'||replace(replace(SDO_UTIL.TO_WKTGEOMETRY($(tableAlias).\"$(columnName)\"),'\\','\\\\'),'\"','\\\"')||'\"' end";
numberColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', '\"'||" + getNumberConversionString() + "||'\"')" ;
datetimeColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.FF9')),'\"'))" ;
dateTimeWithTimeZoneColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.FF9 TZH:TZM')),'\"'))" ;
dateTimeWithLocalTimeZoneColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char(cast($(tableAlias).\"$(columnName)\" as timestamp), 'YYYY-MM-DD HH24:MI:SS.FF9')),'\"'))" ;
timeColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS','NLS_CALENDAR=''GREGORIAN''')),'\"'))" ;
dateColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS','NLS_CALENDAR=''GREGORIAN''')),'\"'))" ;
clobColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then null else '\"'||replace(replace($(tableAlias).\"$(columnName)\",'\\','\\\\'),'\"','\\\"')||'\"' end" ;
blobColumnTemplate = "decode(dbms_lob.getlength($(tableAlias).\"$(columnName)\"), null, to_clob(''), '\"'||$(prefixName)_blob2clob($(tableAlias).\"$(columnName)\")||'\"')" ;
stringColumnTemplate = "nvl2($(tableAlias).\"$(columnName)\", '\"'||replace(replace($(oracleToClob)$(tableAlias).\"$(columnName)\",'\\','\\\\'),'\"','\\\"')||'\"', '')";
geometryColumnTemplate = "nvl2($(tableAlias).\"$(columnName)\", '\"'||replace(replace(SDO_UTIL.TO_WKTGEOMETRY($(tableAlias).\"$(columnName)\"),'\\','\\\\'),'\"','\\\"')||'\"', '')";
numberColumnTemplate = "nvl2($(tableAlias).\"$(columnName)\", $(oracleToClob)'\"'||" + getNumberConversionString() + "||'\"', '')";
datetimeColumnTemplate = "nvl2($(tableAlias).\"$(columnName)\", $(oracleToClob)concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.FF9')),'\"'), '')";
dateTimeWithTimeZoneColumnTemplate = "nvl2($(tableAlias).\"$(columnName)\", $(oracleToClob)concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.FF9 TZH:TZM')),'\"'), '')";
dateTimeWithLocalTimeZoneColumnTemplate = "nvl2($(tableAlias).\"$(columnName)\", $(oracleToClob)concat(concat('\"',to_char(cast($(tableAlias).\"$(columnName)\" as timestamp), 'YYYY-MM-DD HH24:MI:SS.FF9')),'\"'), '')";
timeColumnTemplate = "nvl2($(tableAlias).\"$(columnName)\", $(oracleToClob)concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS','NLS_CALENDAR=''GREGORIAN''')),'\"'), '')";
dateColumnTemplate = "nvl2($(tableAlias).\"$(columnName)\", $(oracleToClob)concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS','NLS_CALENDAR=''GREGORIAN''')),'\"'), '')";
clobColumnTemplate = "nvl2($(tableAlias).\"$(columnName)\", '\"'||replace(replace($(tableAlias).\"$(columnName)\",'\\','\\\\'),'\"','\\\"')||'\"', '')";
blobColumnTemplate = "nvl2($(tableAlias).\"$(columnName)\", '\"'||$(prefixName)_blob2clob($(tableAlias).\"$(columnName)\")||'\"', '')";
longColumnTemplate = "$(oracleToClob)'\"\\b\"'";
booleanColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', '\"'||cast($(tableAlias).\"$(columnName)\" as number("+symmetricDialect.getTemplateNumberPrecisionSpec()+"))||'\"')" ;
xmlColumnTemplate = "decode(dbms_lob.getlength(extract($(tableAlias).\"$(columnName)\", '/').getclobval()), null, to_clob(''), '\"'||replace(replace(extract($(tableAlias).\"$(columnName)\", '/').getclobval(),'\\','\\\\'),'\"','\\\"')||'\"')" ;
booleanColumnTemplate = "nvl2($(tableAlias).\"$(columnName)\", '\"'||cast($(tableAlias).\"$(columnName)\" as number("+symmetricDialect.getTemplateNumberPrecisionSpec()+"))||'\"', '')";
xmlColumnTemplate = "nvl2(extract($(tableAlias).\"$(columnName)\", '/').getclobval(), '\"'||replace(replace(extract($(tableAlias).\"$(columnName)\", '/').getclobval(),'\\','\\\\'),'\"','\\\"')||'\"', '')";
binaryColumnTemplate = blobColumnTemplate;
triggerConcatCharacter = "||" ;
newTriggerValue = ":new" ;
Expand Down
Expand Up @@ -287,6 +287,7 @@ private ParameterConstants() {
public final static String TRANSPORT_HTTP_COMPRESSION_STRATEGY = "compression.strategy";
public final static String TRANSPORT_HTTP_USE_SESSION_AUTH = "http.use.session.auth";
public final static String TRANSPORT_HTTP_SESSION_EXPIRE_SECONDS = "http.session.expire.seconds";
public final static String TRANSPORT_HTTP_SESSION_MAX_COUNT = "http.session.max.count";
public final static String TRANSPORT_HTTP_USE_HEADER_SECURITY_TOKEN = "http.use.header.security.token";
public final static String TRANSPORT_TYPE = "transport.type";
public final static String TRANSPORT_MAX_BYTES_TO_SYNC = "transport.max.bytes.to.sync";
Expand Down
Expand Up @@ -544,7 +544,8 @@ protected String replaceTemplateVariables(DataEventType dml, Trigger trigger,
ddl = FormatUtils.replace("sourceNodeExpression",
symmetricDialect.getSourceNodeExpression(), ddl);

ddl = FormatUtils.replace("oracleLobType", trigger.isUseCaptureLobs() ? getClobType(table) : "long",
ddl = FormatUtils.replace("oracleLobType", trigger.isUseCaptureLobs() ? getClobType(table) :
symmetricDialect.getParameterService().is(ParameterConstants.DBDIALECT_ORACLE_USE_NTYPES_FOR_SYNC) ? "NVARCHAR2(4000)" : "VARCHAR2(4000)",
ddl);
ddl = FormatUtils.replace("oracleLobTypeClobAlways", getClobType(table), ddl);

Expand Down
Expand Up @@ -72,7 +72,7 @@ public void setSymmetricEngine(ISymmetricEngine engine) {
protected String serializeDetails(List<String> offlineNodes) {
String result = null;
try {
new Gson().toJson(offlineNodes);
result = new Gson().toJson(offlineNodes);
} catch(Exception e) {
log.warn("Unable to convert list of offline nodes to JSON", e);
}
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.jumpmind.symmetric.ext.IHeartbeatListener;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.load.IReloadGenerator;
import org.jumpmind.symmetric.model.AbstractBatch.Status;
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.DataEvent;
Expand Down Expand Up @@ -103,8 +104,11 @@ public String reloadTableImmediate(String nodeId, String catalogName, String sch

public String sendSQL(String nodeId, String sql);

public Map<Integer, ExtractRequest> insertReloadEvents(Node targetNode, boolean reverse, List<TableReloadRequest> reloadRequests, ProcessInfo processInfo,
List<TriggerHistory> activeHistories, List<TriggerRouter> triggerRouters, Map<Integer, ExtractRequest> extractRequests);
public Map<Integer, ExtractRequest> insertReloadEvents(
Node targetNode, boolean reverse, List<TableReloadRequest> reloadRequests,
ProcessInfo processInfo, List<TriggerRouter> triggerRouters,
Map<Integer, ExtractRequest> extractRequests,
IReloadGenerator reloadGenerator);

public boolean insertReloadEvent(TableReloadRequest request, boolean deleteAtClient);

Expand Down
Expand Up @@ -1028,13 +1028,13 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo extractInfo, Node targe

if (currentBatch.getStatus() == Status.IG) {
cleanupIgnoredBatch(sourceNode, targetNode, currentBatch, writer);
} else if (!isPreviouslyExtracted(currentBatch, false)) {
} else if (currentBatch.getStatus() == Status.RQ || !isPreviouslyExtracted(currentBatch, false)) {
BatchLock lock = null;
try {
log.debug("{} attempting to acquire lock for batch {}", targetNode.getNodeId(), currentBatch.getBatchId());
lock = acquireLock(currentBatch, useStagingDataWriter);
log.debug("{} acquired lock for batch {}", targetNode.getNodeId(), currentBatch.getBatchId());
if (!isPreviouslyExtracted(currentBatch, true)) {
if (currentBatch.getStatus() == Status.RQ || !isPreviouslyExtracted(currentBatch, true)) {
log.debug("{} extracting batch {}", targetNode.getNodeId(), currentBatch.getBatchId());
currentBatch.setExtractCount(currentBatch.getExtractCount() + 1);

Expand Down
Expand Up @@ -71,6 +71,7 @@
import org.jumpmind.symmetric.io.data.transform.TransformPoint;
import org.jumpmind.symmetric.job.OracleNoOrderHeartbeat;
import org.jumpmind.symmetric.job.PushHeartbeatListener;
import org.jumpmind.symmetric.load.IReloadGenerator;
import org.jumpmind.symmetric.load.IReloadListener;
import org.jumpmind.symmetric.load.IReloadVariableFilter;
import org.jumpmind.symmetric.model.AbstractBatch.Status;
Expand Down Expand Up @@ -813,13 +814,22 @@ private String getReloadChannelIdForTrigger(Trigger trigger, Map<String, Channel

@Override
public Map<Integer, ExtractRequest> insertReloadEvents(Node targetNode, boolean reverse, List<TableReloadRequest> reloadRequests, ProcessInfo processInfo,
List<TriggerHistory> activeHistories, List<TriggerRouter> triggerRouters, Map<Integer, ExtractRequest> extractRequests) {
List<TriggerRouter> triggerRouters, Map<Integer, ExtractRequest> extractRequests,
IReloadGenerator reloadGenerator)
{
if (engine.getClusterService().lock(ClusterConstants.SYNC_TRIGGERS)) {
try {
INodeService nodeService = engine.getNodeService();
ITriggerRouterService triggerRouterService = engine.getTriggerRouterService();

synchronized (triggerRouterService) {

List<TriggerHistory> activeHistories = null;
if (reloadGenerator == null) {
activeHistories = triggerRouterService.getActiveTriggerHistories();
} else {
activeHistories = reloadGenerator.getActiveTriggerHistories(targetNode);
}

boolean isFullLoad = reloadRequests == null
|| (reloadRequests.size() == 1 && reloadRequests.get(0).isFullLoadRequest());
Expand Down

0 comments on commit 5e5e9e2

Please sign in to comment.