Skip to content

Commit

Permalink
0003425: Adding and modifying variables used in initial load and
Browse files Browse the repository at this point in the history
configuration
  • Loading branch information
erilong committed Feb 14, 2018
1 parent e9e094a commit 1565654
Show file tree
Hide file tree
Showing 10 changed files with 142 additions and 16 deletions.
39 changes: 39 additions & 0 deletions symmetric-assemble/src/asciidoc/advanced-topics.ad
Original file line number Diff line number Diff line change
Expand Up @@ -581,3 +581,42 @@ directory is cleared out before each subsequent delivery of files.

The acknowledgement of a batch happens the same way it is acknowledged in database synchronization. The client responds with an
acknowledgement as part of the response during a file push or pull.

=== Variables

Variables can be used throughout configuration with the `$(variableName)` syntax. Check the documentation for each configuration
item to see which variables it supports. A substring of the variable value can be specified with a starting index and an optional
ending index. The first character is at index 0, and the end index is not included in the substring.
Therefore, the length of the substring will be end index minus start index.

[source, cli]
----
$(variableName:start)
$(variableName:start:end)
----

Examples when `$(externalId)` is set to `00001-002`:

[source, cli]
----
$(externalId:0:5) - returns 00001
$(externalId:6) - returns 002
----

The variable value can be formatted using a https://docs.oracle.com/javase/8/docs/api/java/util/Formatter.html#syntax[format string]
supported by `java.lang.String.format()`.

[source, cli]
----
$(variableName|format_string)
----

Examples when `$(externalId)` is set to `1`:

[source, cli]
----
$(variableName|%05d) - returns 00001
----



4 changes: 2 additions & 2 deletions symmetric-assemble/src/asciidoc/configuration/routers.ad
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ Router Expression:: An expression that is specific to the type of router that is
If this field is unspecified, the catalog will be either the default catalog at the target node or the "source catalog name" from the table trigger,
depending on how "use source catalog schema" is set for the router. Variables are substituted for `$(sourceNodeId)`, `$(sourceExternalId)`, `$(sourceNodeGroupId)`,
`$(targetNodeId)`, `$(targetExternalId)`, `$(targetNodeGroupId)`, and `$(none)`.
Parameter values can be substituted using `$(name)` syntax.
Parameter values can be substituted using `$(name)` syntax. See <<Variables>>.
[[router-target-schema]]Target Schema:: Optional name of schema where a target table is located.
If this field is unspecified, the schema will be either the default schema at the target node or the "source schema name" from the table trigger,
depending on how "use source catalog schema" is set for the router. Variables are substituted for `$(sourceNodeId)`, `$(sourceExternalId)`, `$(sourceNodeGroupId)`,
`$(targetNodeId)`, `$(targetExternalId)`, `$(targetNodeGroupId)`, and `$(none)`.
Parameter values can be substituted using `$(name)` syntax.
Parameter values can be substituted using `$(name)` syntax. See <<Variables>>.

ifdef::pro[]
.Advanced Options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ endif::pro[]
Trigger Id:: Unique identifier for a trigger.
Source Catalog:: Optional name for the catalog the configured table is in. If the name includes * then a wildcard match on the table name will be attempted. \
Wildcard names can include a list of names that are comma separated. The ! symbol may be used to indicate a NOT match condition.
Parameter values can be substituted using `$(name)` syntax.
Parameter values can be substituted using `$(name)` syntax. See <<Variables>>.
Source Schema:: Optional name for the schema a configured table is in. If the name includes * then a wildcard match on the table name will be attempted.
Wildcard names can include a list of names that are comma separated. The ! symbol may be used to indicate a NOT match condition.
Parameter values can be substituted using `$(name)` syntax.
Parameter values can be substituted using `$(name)` syntax. See <<Variables>>.
Source Table:: The name of the source table that will have a trigger installed to watch for data changes. See <<Trigger Wildcards>> for using wildcards
to specify multiple source tables.
Parameter values can be substituted using `$(name)` syntax.
Parameter values can be substituted using `$(name)` syntax. See <<Variables>>.
Channel:: The channel_id of the channel that data changes will flow through.

ifdef::pro[]
Expand Down Expand Up @@ -83,7 +83,7 @@ It will be used in the generated database trigger to populate the EXTERNAL_DATA
Excluded Column Names:: Specify a comma-delimited list of columns that should not be synchronized from this table.
Included Column Names:: Specify a comma-delimited list of columns only should be synchronized from this table.
Sync Key Names:: Specify a comma-delimited list of columns that should be used as the key for synchronization operations. By default, if not specified, then the primary key of the table will be used.
Channel Expression:: An expression that will be used to capture the channel id in the trigger. This expression will only be used if the channel_id is set to 'dynamic'. The variable "$(schemaName)" can be used, which is replaced with the source schema of the table.
Channel Expression:: An expression that will be used to capture the channel id in the trigger. This expression will only be used if the channel_id is set to 'dynamic'. The variable "$(schemaName)" can be used, which is replaced with the source schema of the table. See <<Variables>>.

.Sample Triggers
====
Expand Down
3 changes: 3 additions & 0 deletions symmetric-assemble/src/asciidoc/developer.ad
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ Implement this extension point to receive callback events when a batch is acknow
===== IReloadListener
Implement this extension point to listen in and take action before or after a reload is requested for a Node. The callback for this listener happens at the point of extraction.

===== IReloadVariableFilter
Implement this extension point to filter the SQL used by the initial load to query source tables and purge target tables. The extension receives the SQL and can replace variable names with values, which allows for adding new variables. The org.jumpmind.util.FormatUtils.replace() method can be used to find and replace variables.

===== ISyncUrlExtension
This extension point is used to select an appropriate URL based on the URI provided in the sync_url column of sym_node.

Expand Down
4 changes: 3 additions & 1 deletion symmetric-assemble/src/asciidoc/manage/node-load.ad
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ insert into SYM_TABLE_RELOAD_REQUEST (target_node_id, source_node_id, trigger_id

===== Load data for a specific table with partial data
Insert a row into <<TABLE_RELOAD_REQUEST>> and set the reload_select to the where clause to run while extracting data. There are 3 variables
available for replacement.
available for replacement. See <<Variables>>.

* $(groupId)
* $(nodeId)
Expand Down Expand Up @@ -113,6 +113,8 @@ Partial loads will then see a table selection screen. Full loads will immediate
* $(nodeId)
* $(externalId)

See <<Variables>>.

image::manage/manage-load-data-tables.png[]

Finally a summary screen is presented to review all the settings for the load prior to adding it to the request queue. Loads are checked by the routing process so once the load has been saved it will be picked up and begin processing on the next run of the routing job.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.load;

import org.jumpmind.db.model.Table;
import org.jumpmind.extension.IExtensionPoint;
import org.jumpmind.symmetric.model.Node;

/**
* Filter the SQL used during initial load to query the source table and purge the target table.
*
* @see org.jumpmind.util.FormatUtils.replace()
*
*/
public interface IReloadVariableFilter extends IExtensionPoint {

public String filterInitalLoadSql(String sql, Node targetNode, Table table);

public String filterPurgeSql(String sql, Node targetNode, Table table);

}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import org.jumpmind.symmetric.io.stage.IStagedResource.State;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.io.stage.StagingFileLock;
import org.jumpmind.symmetric.load.IReloadVariableFilter;
import org.jumpmind.symmetric.model.AbstractBatch.Status;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.ChannelMap;
Expand Down Expand Up @@ -143,6 +144,7 @@
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.IDataExtractorService;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.IExtensionService;
import org.jumpmind.symmetric.service.INodeCommunicationService;
import org.jumpmind.symmetric.service.INodeCommunicationService.INodeCommunicationExecutor;
import org.jumpmind.symmetric.service.INodeService;
Expand Down Expand Up @@ -197,6 +199,8 @@ protected enum ExtractMode { FOR_SYM_CLIENT, FOR_PAYLOAD_CLIENT, EXTRACT_ONLY };
private INodeCommunicationService nodeCommunicationService;

private IClusterService clusterService;

private IExtensionService extensionService;

private Map<String, BatchLock> locks = new HashMap<String, BatchLock>();

Expand All @@ -216,6 +220,7 @@ public DataExtractorService(ISymmetricEngine engine) {
this.nodeCommunicationService = engine.getNodeCommunicationService();
this.clusterService = engine.getClusterService();
this.sequenceService = engine.getSequenceService();
this.extensionService = engine.getExtensionService();
setSqlMap(new DataExtractorServiceSqlMap(symmetricDialect.getPlatform(),
createSqlReplacementTokens()));
}
Expand Down Expand Up @@ -2419,12 +2424,16 @@ protected void startNewCursor(final TriggerHistory triggerHistory,
if (overrideSelectSql != null && overrideSelectSql.trim().toUpperCase().startsWith("WHERE")) {
overrideSelectSql = overrideSelectSql.trim().substring(5);
}
final String initialLoadSql = symmetricDialect.createInitialLoadSqlFor(
String sql = symmetricDialect.createInitialLoadSqlFor(
this.currentInitialLoadEvent.getNode(), triggerRouter, sourceTable,
triggerHistory,
configurationService.getChannel(triggerRouter.getTrigger().getChannelId()),
overrideSelectSql);

for (IReloadVariableFilter filter : extensionService.getExtensionPointList(IReloadVariableFilter.class)) {
sql = filter.filterInitalLoadSql(sql, node, targetTable);
}

final String initialLoadSql = sql;
final int expectedCommaCount = triggerHistory.getParsedColumnNames().length - 1;
final boolean selectedAsCsv = symmetricDialect.getParameterService().is(
ParameterConstants.INITIAL_LOAD_CONCAT_CSV_IN_SQL_ENABLED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.jumpmind.symmetric.io.data.transform.TransformPoint;
import org.jumpmind.symmetric.job.PushHeartbeatListener;
import org.jumpmind.symmetric.load.IReloadListener;
import org.jumpmind.symmetric.load.IReloadVariableFilter;
import org.jumpmind.symmetric.model.AbstractBatch.Status;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.Data;
Expand Down Expand Up @@ -986,6 +987,9 @@ protected int getDataCountForReload(Table table, Node targetNode, String selectS
sql = FormatUtils.replace("groupId", targetNode.getNodeGroupId(), sql);
sql = FormatUtils.replace("externalId", targetNode.getExternalId(), sql);
sql = FormatUtils.replace("nodeId", targetNode.getNodeId(), sql);
for (IReloadVariableFilter filter : extensionService.getExtensionPointList(IReloadVariableFilter.class)) {
sql = filter.filterPurgeSql(sql, targetNode, table);
}

int rowCount = sqlTemplate.queryForInt(sql);
return rowCount;
Expand Down Expand Up @@ -1148,6 +1152,12 @@ protected void createPurgeEvent(ISqlTransaction transaction, String sql, Node ta
sql = FormatUtils.replace("sourceGroupId", sourceNode.getNodeGroupId(), sql);
sql = FormatUtils.replace("sourceExternalId", sourceNode.getExternalId(), sql);
sql = FormatUtils.replace("sourceNodeId", sourceNode.getNodeId(), sql);
Table table = new Table(triggerHistory.getSourceCatalogName(), triggerHistory.getSourceSchemaName(),
triggerHistory.getSourceTableName(), triggerHistory.getParsedColumnNames(), triggerHistory.getParsedPkColumnNames());
for (IReloadVariableFilter filter : extensionService.getExtensionPointList(IReloadVariableFilter.class)) {
sql = filter.filterPurgeSql(sql, targetNode, table);
}

String channelId = getReloadChannelIdForTrigger(triggerRouter.getTrigger(), engine
.getConfigurationService().getChannels(false));
Data data = new Data(triggerHistory.getSourceTableName(), DataEventType.SQL,
Expand Down
26 changes: 20 additions & 6 deletions symmetric-util/src/main/java/org/jumpmind/util/FormatUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ private FormatUtils() {
}

public static String replace(String prop, String replaceWith, String sourceString) {
return StringUtils.replace(sourceString, "$(" + prop + ")", replaceWith);
Map<String, String> replacements = new HashMap<String, String>(1);
replacements.put(prop, replaceWith);
return replaceTokens(sourceString, replacements, true);
}

public static String replaceToken(String text, String tokenToReplace, String replaceWithText,
Expand Down Expand Up @@ -100,12 +102,25 @@ public static String replaceTokens(String text, Map<String, String> replacements
Matcher matcher = pattern.matcher(text);
StringBuffer buffer = new StringBuffer();
while (matcher.find()) {
String[] match = matcher.group(1).split("\\|");
String replacement = replacements.get(match[0]);
String[] matchPipe = matcher.group(1).split("\\|");
String[] matchColon = matchPipe[0].split(":");
String replacement = replacements.get(matchColon[0]);
if (replacement != null) {
matcher.appendReplacement(buffer, "");
if (match.length == 2) {
replacement = formatString(match[1], replacement);
if (matchColon.length == 2) {
int startIndex = Integer.parseInt(matchColon[1]);
if (startIndex <= replacement.length()) {
replacement = replacement.substring(Integer.parseInt(matchColon[1]));
}
} else if (matchColon.length == 3) {
int startIndex = Integer.parseInt(matchColon[1]);
int endIndex = Integer.parseInt(matchColon[2]);
if (startIndex <= replacement.length() && endIndex <= replacement.length()) {
replacement = replacement.substring(startIndex, endIndex);
}
}
if (matchPipe.length == 2) {
replacement = formatString(matchPipe[1], replacement);
}
buffer.append(replacement);
}
Expand All @@ -119,7 +134,6 @@ public static String replaceTokens(String text, Map<String, String> replacements
}
}
return text;

}

public static String formatString(String format, String arg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,17 @@ public void testReplaceCurrentTimestamp() {
assertEquals(afterSql, FormatUtils.replaceTokens(beforeSql, replacementTokens, false));

}


@Test
public void testReplace() {
assertEquals(FormatUtils.replace("nodeId", "001", "nodeId = $(nodeId)"), "nodeId = 001");
assertEquals(FormatUtils.replace("nodeId", "001", "nodeId = $(nodeId:0)"), "nodeId = 001");
assertEquals(FormatUtils.replace("nodeId", "001", "nodeId = $(nodeId:0:10)"), "nodeId = 001");
assertEquals(FormatUtils.replace("nodeId", "1234567890ABC", "nodeId = $(nodeId:10)"), "nodeId = ABC");
assertEquals(FormatUtils.replace("nodeId", "1234567890ABC", "nodeId = $(nodeId:10:11)"), "nodeId = A");
assertEquals(FormatUtils.replace("nodeId", "001-002", "nodeId = $(nodeId:4)"), "nodeId = 002");
}

@Test
public void testIsWildcardMatch() {
assertTrue(FormatUtils.isWildCardMatch("TEST_1", "TEST_*"));
Expand Down

0 comments on commit 1565654

Please sign in to comment.